/*
 * MinIO Cloud Storage, (C) 2020 MinIO, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cmd

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io"
	"path"
	"runtime/debug"
	"strings"
	"sync"
	"time"

	"github.com/klauspost/compress/s2"
	"github.com/minio/minio/cmd/logger"
	"github.com/minio/minio/pkg/console"
	"github.com/minio/minio/pkg/hash"
	"github.com/tinylib/msgp/msgp"
)

//go:generate msgp -file $GOFILE -unexported

// a bucketMetacache keeps track of all caches generated
// for a bucket.
type bucketMetacache struct {
	// Name of bucket
	bucket string

	// caches indexed by id.
	caches map[string]metacache
	// cache ids indexed by root paths
	cachesRoot map[string][]string `msg:"-"`

	// Internal state
	mu        sync.RWMutex `msg:"-"`
	updated   bool         `msg:"-"`
	transient bool         `msg:"-"` // bucket used for non-persisted caches.
}

// newBucketMetacache creates a new bucketMetacache.
// Optionally remove all existing caches.
func newBucketMetacache(bucket string, cleanup bool) *bucketMetacache {
	if cleanup {
		// Recursively delete all caches.
		objAPI := newObjectLayerFn()
		ez, ok := objAPI.(*erasureServerPools)
		if ok {
			ctx := context.Background()
			ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(bucket, slashSeparator))
		}
	}
	return &bucketMetacache{
		bucket:     bucket,
		caches:     make(map[string]metacache, 10),
		cachesRoot: make(map[string][]string, 10),
	}
}

func (b *bucketMetacache) debugf(format string, data ...interface{}) {
	if serverDebugLog {
		console.Debugf(format+"\n", data...)
	}
}

// loadBucketMetaCache will load the cache from the object layer.
// If the cache cannot be found a new one is created.
func loadBucketMetaCache(ctx context.Context, bucket string) (*bucketMetacache, error) {
	objAPI := newObjectLayerFn()
	for objAPI == nil {
		select {
		case <-ctx.Done():
			return nil, ctx.Err()
		default:
			time.Sleep(250 * time.Millisecond)
		}
		objAPI = newObjectLayerFn()
		if objAPI == nil {
			logger.LogIf(ctx, fmt.Errorf("loadBucketMetaCache: object layer not ready. bucket: %q", bucket))
		}
	}
	var meta bucketMetacache
	var decErr error
	var wg sync.WaitGroup
	wg.Add(1)

	r, w := io.Pipe()
	go func() {
		defer wg.Done()
		dec := s2DecPool.Get().(*s2.Reader)
		dec.Reset(r)
		decErr = meta.DecodeMsg(msgp.NewReader(dec))
		dec.Reset(nil)
		s2DecPool.Put(dec)
		r.CloseWithError(decErr)
	}()
	// Use global context for this.
	err := objAPI.GetObject(GlobalContext, minioMetaBucket, pathJoin("buckets", bucket, ".metacache", "index.s2"), 0, -1, w, "", ObjectOptions{})
	logger.LogIf(ctx, w.CloseWithError(err))
	wg.Wait()
	if err != nil {
		switch err.(type) {
		case ObjectNotFound:
			err = nil
		case InsufficientReadQuorum:
			// Cache is likely lost. Clean up and return new.
			return newBucketMetacache(bucket, true), nil
		default:
			logger.LogIf(ctx, err)
		}
		return newBucketMetacache(bucket, false), err
	}
	if decErr != nil {
		if errors.Is(err, context.Canceled) {
			return newBucketMetacache(bucket, false), err
		}
		// Log the error, but assume the data is lost and return a fresh bucket.
		// Otherwise a broken cache will never recover.
		logger.LogIf(ctx, decErr)
		return newBucketMetacache(bucket, true), nil
	}
	// Sanity check...
	if meta.bucket != bucket {
		logger.Info("loadBucketMetaCache: loaded cache name mismatch, want %s, got %s. Discarding.", bucket, meta.bucket)
		return newBucketMetacache(bucket, true), nil
	}
	meta.cachesRoot = make(map[string][]string, len(meta.caches)/10)
	// Index roots
	for id, cache := range meta.caches {
		meta.cachesRoot[cache.root] = append(meta.cachesRoot[cache.root], id)
	}
	return &meta, nil
}

// save the bucket cache to the object storage.
func (b *bucketMetacache) save(ctx context.Context) error {
	if b.transient {
		return nil
	}
	objAPI := newObjectLayerFn()
	if objAPI == nil {
		return errServerNotInitialized
	}

	// Keep lock while we marshal.
	// We need a write lock since we update 'updated'
	b.mu.Lock()
	if !b.updated {
		b.mu.Unlock()
		return nil
	}
	// Save as s2 compressed msgpack
	tmp := bytes.NewBuffer(make([]byte, 0, b.Msgsize()))
	enc := s2.NewWriter(tmp)
	err := msgp.Encode(enc, b)
	if err != nil {
		b.mu.Unlock()
		return err
	}
	err = enc.Close()
	if err != nil {
		b.mu.Unlock()
		return err
	}
	b.updated = false
	b.mu.Unlock()

	hr, err := hash.NewReader(tmp, int64(tmp.Len()), "", "", int64(tmp.Len()), false)
	if err != nil {
		return err
	}
	_, err = objAPI.PutObject(ctx, minioMetaBucket, pathJoin("buckets", b.bucket, ".metacache", "index.s2"), NewPutObjReader(hr, nil, nil), ObjectOptions{})
	logger.LogIf(ctx, err)
	return err
}

// findCache will attempt to find a matching cache for the provided options.
// If a cache with the same ID exists already it will be returned.
// If none can be found a new is created with the provided ID.
func (b *bucketMetacache) findCache(o listPathOptions) metacache {
	if b == nil {
		logger.Info("bucketMetacache.findCache: nil cache for bucket %s", o.Bucket)
		return metacache{}
	}

	if o.Bucket != b.bucket && !b.transient {
		logger.Info("bucketMetacache.findCache: bucket %s does not match this bucket %s", o.Bucket, b.bucket)
		debug.PrintStack()
		return metacache{}
	}

	extend := globalAPIConfig.getExtendListLife()

	// Grab a write lock, since we create one if we cannot find one.
	if o.Create {
		b.mu.Lock()
		defer b.mu.Unlock()
	} else {
		b.mu.RLock()
		defer b.mu.RUnlock()
	}

	// Check if exists already.
	if c, ok := b.caches[o.ID]; ok {
		b.debugf("returning existing %v", o.ID)
		return c
	}
	// No need to do expensive checks on transients.
	if b.transient {
		if !o.Create {
			return metacache{
				id:     o.ID,
				bucket: o.Bucket,
				status: scanStateNone,
			}
		}

		// Create new
		best := o.newMetacache()
		b.caches[o.ID] = best
		b.updated = true
		b.debugf("returning new cache %s, bucket: %v", best.id, best.bucket)
		return best
	}

	var best metacache
	rootSplit := strings.Split(o.BaseDir, slashSeparator)
	for i := range rootSplit {
		interesting := b.cachesRoot[path.Join(rootSplit[:i+1]...)]

		for _, id := range interesting {
			cached, ok := b.caches[id]
			if !ok {
				continue
			}
			if !cached.matches(&o, extend) {
				continue
			}
			if cached.started.Before(best.started) {
				b.debugf("cache %s disregarded - we have a better", cached.id)
				// If we already have a newer, keep that.
				continue
			}
			best = cached
		}
	}
	if !best.started.IsZero() {
		if o.Create {
			best.lastHandout = UTCNow()
			b.caches[best.id] = best
			b.updated = true
		}
		b.debugf("returning cached %s, status: %v, ended: %v", best.id, best.status, best.ended)
		return best
	}
	if !o.Create {
		return metacache{
			id:     o.ID,
			bucket: o.Bucket,
			status: scanStateNone,
		}
	}

	// Create new and add.
	best = o.newMetacache()
	b.caches[o.ID] = best
	b.cachesRoot[best.root] = append(b.cachesRoot[best.root], best.id)
	b.updated = true
	b.debugf("returning new cache %s, bucket: %v", best.id, best.bucket)
	return best
}

// cleanup removes redundant and outdated entries.
func (b *bucketMetacache) cleanup() {
	// Entries to remove.
	remove := make(map[string]struct{})
	currentCycle := intDataUpdateTracker.current()

	// Test on a copy
	// cleanup is the only one deleting caches.
	caches, rootIdx := b.cloneCaches()

	for id, cache := range caches {
		if b.transient && time.Since(cache.lastUpdate) > 15*time.Minute && time.Since(cache.lastHandout) > 15*time.Minute {
			// Keep transient caches only for 15 minutes.
			remove[id] = struct{}{}
			continue
		}
		if !cache.worthKeeping(currentCycle) {
			b.debugf("cache %s not worth keeping", id)
			remove[id] = struct{}{}
			continue
		}
		if cache.id != id {
			logger.Info("cache ID mismatch %s != %s", id, cache.id)
			remove[id] = struct{}{}
			continue
		}
		if cache.bucket != b.bucket && !b.transient {
			logger.Info("cache bucket mismatch %s != %s", b.bucket, cache.bucket)
			remove[id] = struct{}{}
			continue
		}
	}

	// Check all non-deleted against eachother.
	// O(n*n), but should still be rather quick.
	for id, cache := range caches {
		if b.transient {
			break
		}
		if _, ok := remove[id]; ok {
			continue
		}

		interesting := interestingCaches(cache.root, rootIdx)
		for _, id2 := range interesting {
			if _, ok := remove[id2]; ok || id2 == id {
				// Don't check against one we are already removing
				continue
			}
			cache2, ok := caches[id2]
			if !ok {
				continue
			}

			if cache.canBeReplacedBy(&cache2) {
				b.debugf("cache %s can be replaced by %s", id, cache2.id)
				remove[id] = struct{}{}
				break
			} else {
				b.debugf("cache %s can be NOT replaced by %s", id, cache2.id)
			}
		}
	}

	for id := range remove {
		b.deleteCache(id)
	}
}

// Potentially interesting caches.
// Will only add root if request is for root.
func interestingCaches(root string, cachesRoot map[string][]string) []string {
	var interesting []string
	rootSplit := strings.Split(root, slashSeparator)
	for i := range rootSplit {
		want := path.Join(rootSplit[:i+1]...)
		interesting = append(interesting, cachesRoot[want]...)
	}
	return interesting
}

// updateCache will update a cache by id.
// If the cache cannot be found nil is returned.
// The bucket cache will be locked until the done .
func (b *bucketMetacache) updateCache(id string) (cache *metacache, done func()) {
	b.mu.Lock()
	c, ok := b.caches[id]
	if !ok {
		b.mu.Unlock()
		return nil, func() {}
	}
	return &c, func() {
		c.lastUpdate = UTCNow()
		b.caches[id] = c
		b.mu.Unlock()
	}
}

// updateCacheEntry will update a cache.
// Returns the updated status.
func (b *bucketMetacache) updateCacheEntry(update metacache) (metacache, error) {
	b.mu.Lock()
	defer b.mu.Unlock()
	existing, ok := b.caches[update.id]
	if !ok {
		logger.Info("updateCacheEntry: bucket %s list id %v not found", b.bucket, update.id)
		return update, errFileNotFound
	}
	existing.update(update)
	b.caches[update.id] = existing
	b.updated = true
	return existing, nil
}

// cloneCaches will return a clone of all current caches.
func (b *bucketMetacache) cloneCaches() (map[string]metacache, map[string][]string) {
	b.mu.RLock()
	defer b.mu.RUnlock()
	dst := make(map[string]metacache, len(b.caches))
	for k, v := range b.caches {
		dst[k] = v
	}
	// Copy indexes
	dst2 := make(map[string][]string, len(b.cachesRoot))
	for k, v := range b.cachesRoot {
		tmp := make([]string, len(v))
		copy(tmp, v)
		dst2[k] = tmp
	}

	return dst, dst2
}

// getCache will return a clone of a specific metacache.
// Will return nil if the cache doesn't exist.
func (b *bucketMetacache) getCache(id string) *metacache {
	b.mu.RLock()
	c, ok := b.caches[id]
	b.mu.RUnlock()
	if !ok {
		return nil
	}
	return &c
}

// deleteAll will delete all on disk data for ALL caches.
// Deletes are performed concurrently.
func (b *bucketMetacache) deleteAll() {
	ctx := context.Background()
	ez, ok := newObjectLayerFn().(*erasureServerPools)
	if !ok {
		logger.LogIf(ctx, errors.New("bucketMetacache: expected objAPI to be *erasurePools"))
		return
	}

	b.mu.Lock()
	defer b.mu.Unlock()

	b.updated = true
	if !b.transient {
		// Delete all.
		ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(b.bucket, slashSeparator))
		b.caches = make(map[string]metacache, 10)
		b.cachesRoot = make(map[string][]string, 10)
		return
	}

	// Transient are in different buckets.
	var wg sync.WaitGroup
	for id := range b.caches {
		wg.Add(1)
		go func(cache metacache) {
			defer wg.Done()
			ez.deleteAll(ctx, minioMetaBucket, metacachePrefixForID(cache.bucket, cache.id))
		}(b.caches[id])
	}
	wg.Wait()
	b.caches = make(map[string]metacache, 10)
}

// deleteCache will delete a specific cache and all files related to it across the cluster.
func (b *bucketMetacache) deleteCache(id string) {
	b.mu.Lock()
	c, ok := b.caches[id]
	if ok {
		// Delete from root map.
		list := b.cachesRoot[c.root]
		for i, lid := range list {
			if id == lid {
				list = append(list[:i], list[i+1:]...)
				break
			}
		}
		b.cachesRoot[c.root] = list
		delete(b.caches, id)
		b.updated = true
	}
	b.mu.Unlock()
	if ok {
		c.delete(context.Background())
	}
}