/*
 * Minio Cloud Storage, (C) 2016 Minio, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package cmd

import "sync"

// Heals a bucket if it doesn't exist on one of the disks.
func (xl xlObjects) HealBucket(bucket string) error {
	// Verify if bucket is valid.
	if !IsValidBucketName(bucket) {
		return traceError(BucketNameInvalid{Bucket: bucket})
	}

	// Verify if bucket exists.
	if !xl.isBucketExist(bucket) {
		return traceError(BucketNotFound{Bucket: bucket})
	}

	// Heal bucket - create buckets on disks where it does not exist.

	// get a random ID for lock instrumentation.
	opsID := getOpsID()

	nsMutex.Lock(bucket, "", opsID)
	defer nsMutex.Unlock(bucket, "", opsID)

	// Initialize sync waitgroup.
	var wg = &sync.WaitGroup{}

	// Initialize list of errors.
	var dErrs = make([]error, len(xl.storageDisks))

	// Make a volume entry on all underlying storage disks.
	for index, disk := range xl.storageDisks {
		if disk == nil {
			dErrs[index] = traceError(errDiskNotFound)
			continue
		}
		wg.Add(1)
		// Make a volume inside a go-routine.
		go func(index int, disk StorageAPI) {
			defer wg.Done()
			if _, err := disk.StatVol(bucket); err != nil {
				if err != errVolumeNotFound {
					dErrs[index] = traceError(err)
					return
				}
				if err = disk.MakeVol(bucket); err != nil {
					dErrs[index] = traceError(err)
				}
			}
		}(index, disk)
	}

	// Wait for all make vol to finish.
	wg.Wait()

	// Do we have write quorum?.
	if !isDiskQuorum(dErrs, xl.writeQuorum) {
		// Purge successfully created buckets if we don't have writeQuorum.
		xl.undoMakeBucket(bucket)
		return toObjectErr(traceError(errXLWriteQuorum), bucket)
	}

	// Verify we have any other errors which should be returned as failure.
	if reducedErr := reduceErrs(dErrs, []error{
		errDiskNotFound,
		errFaultyDisk,
		errDiskAccessDenied,
	}); reducedErr != nil {
		return toObjectErr(reducedErr, bucket)
	}
	return nil
}

// HealObject heals a given object for all its missing entries.
// FIXME: If an object object was deleted and one disk was down, and later the disk comes back
// up again, heal on the object should delete it.
func (xl xlObjects) HealObject(bucket, object string) error {
	// Verify if bucket is valid.
	if !IsValidBucketName(bucket) {
		return traceError(BucketNameInvalid{Bucket: bucket})
	}

	// Verify if object is valid.
	if !IsValidObjectName(object) {
		return traceError(ObjectNameInvalid{Bucket: bucket, Object: object})
	}

	// get a random ID for lock instrumentation.
	opsID := getOpsID()

	// Lock the object before healing.
	nsMutex.RLock(bucket, object, opsID)
	defer nsMutex.RUnlock(bucket, object, opsID)

	partsMetadata, errs := readAllXLMetadata(xl.storageDisks, bucket, object)
	if err := reduceErrs(errs, nil); err != nil {
		return toObjectErr(err, bucket, object)
	}

	if !xlShouldHeal(partsMetadata, errs) {
		// There is nothing to heal.
		return nil
	}

	// List of disks having latest version of the object.
	latestDisks, modTime := listOnlineDisks(xl.storageDisks, partsMetadata, errs)
	// List of disks having outdated version of the object or missing object.
	outDatedDisks := outDatedDisks(xl.storageDisks, partsMetadata, errs)
	// Latest xlMetaV1 for reference.
	latestMeta := pickValidXLMeta(partsMetadata, modTime)

	for index, disk := range outDatedDisks {
		// Before healing outdated disks, we need to remove xl.json
		// and part files from "bucket/object/" so that
		// rename(".minio.sys", "tmp/tmpuuid/", "bucket", "object/") succeeds.
		if disk == nil {
			// Not an outdated disk.
			continue
		}
		if errs[index] != nil {
			// If there was an error (most likely errFileNotFound)
			continue
		}
		// Outdated object with the same name exists that needs to be deleted.
		outDatedMeta := partsMetadata[index]
		// Delete all the parts.
		for partIndex := 0; partIndex < len(outDatedMeta.Parts); partIndex++ {
			err := disk.DeleteFile(bucket, pathJoin(object, outDatedMeta.Parts[partIndex].Name))
			if err != nil {
				return traceError(err)
			}
		}
		// Delete xl.json file.
		err := disk.DeleteFile(bucket, pathJoin(object, xlMetaJSONFile))
		if err != nil {
			return traceError(err)
		}
	}

	// Reorder so that we have data disks first and parity disks next.
	latestDisks = getOrderedDisks(latestMeta.Erasure.Distribution, latestDisks)
	outDatedDisks = getOrderedDisks(latestMeta.Erasure.Distribution, outDatedDisks)
	partsMetadata = getOrderedPartsMetadata(latestMeta.Erasure.Distribution, partsMetadata)

	// We write at temporary location and then rename to fianal location.
	tmpID := getUUID()

	// Checksum of the part files. checkSumInfos[index] will contain checksums
	// of all the part files in the outDatedDisks[index]
	checkSumInfos := make([][]checkSumInfo, len(outDatedDisks))

	// Heal each part. erasureHealFile() will write the healed part to
	// .minio/tmp/uuid/ which needs to be renamed later to the final location.
	for partIndex := 0; partIndex < len(latestMeta.Parts); partIndex++ {
		partName := latestMeta.Parts[partIndex].Name
		partSize := latestMeta.Parts[partIndex].Size
		erasure := latestMeta.Erasure
		sumInfo := latestMeta.Erasure.GetCheckSumInfo(partName)
		// Heal the part file.
		checkSums, err := erasureHealFile(latestDisks, outDatedDisks,
			bucket, pathJoin(object, partName),
			minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID, partName),
			partSize, erasure.BlockSize, erasure.DataBlocks, erasure.ParityBlocks, sumInfo.Algorithm)
		if err != nil {
			return err
		}
		for index, sum := range checkSums {
			if outDatedDisks[index] == nil {
				continue
			}
			checkSumInfos[index] = append(checkSumInfos[index], checkSumInfo{partName, sumInfo.Algorithm, sum})
		}
	}

	// xl.json should be written to all the healed disks.
	for index, disk := range outDatedDisks {
		if disk == nil {
			continue
		}
		partsMetadata[index] = latestMeta
		partsMetadata[index].Erasure.Checksum = checkSumInfos[index]
	}

	// Generate and write `xl.json` generated from other disks.
	err := writeUniqueXLMetadata(outDatedDisks, minioMetaBucket, pathJoin(tmpMetaPrefix, tmpID), partsMetadata, diskCount(outDatedDisks))
	if err != nil {
		return toObjectErr(err, bucket, object)
	}

	// Rename from tmp location to the actual location.
	for _, disk := range outDatedDisks {
		if disk == nil {
			continue
		}
		// Remove any lingering partial data from current namespace.
		err = disk.DeleteFile(bucket, retainSlash(object))
		if err != nil && err != errFileNotFound {
			return traceError(err)
		}
		// Attempt a rename now from healed data to final location.
		err = disk.RenameFile(minioMetaBucket, retainSlash(pathJoin(tmpMetaPrefix, tmpID)), bucket, retainSlash(object))
		if err != nil {
			return traceError(err)
		}
	}
	return nil
}