mirror of
https://github.com/minio/minio.git
synced 2024-12-23 21:55:53 -05:00
Remove unusued params and functions (#8399)
This commit is contained in:
parent
b7ee0bbbc9
commit
d48fd6fde9
@ -1606,10 +1606,7 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// Use buffered channel to take care of burst sends or slow w.Write()
|
||||
traceCh := make(chan interface{}, 4000)
|
||||
|
||||
peers, err := getRestClients(getRemoteHosts(globalEndpoints))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
peers := getRestClients(getRemoteHosts(globalEndpoints))
|
||||
|
||||
globalHTTPTrace.Subscribe(traceCh, doneCh, func(entry interface{}) bool {
|
||||
return mustTrace(entry, trcAll, trcErr)
|
||||
@ -1674,10 +1671,7 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque
|
||||
logCh := make(chan interface{}, 4000)
|
||||
|
||||
remoteHosts := getRemoteHosts(globalEndpoints)
|
||||
peers, err := getRestClients(remoteHosts)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
peers := getRestClients(remoteHosts)
|
||||
|
||||
globalConsoleSys.Subscribe(logCh, doneCh, node, limitLines, logKind, nil)
|
||||
|
||||
|
@ -331,7 +331,7 @@ func initTestXLObjLayer() (ObjectLayer, []string, error) {
|
||||
return nil, nil, err
|
||||
}
|
||||
endpoints := mustGetNewEndpointList(xlDirs...)
|
||||
format, err := waitForFormatXL(context.Background(), true, endpoints, 1, 16)
|
||||
format, err := waitForFormatXL(true, endpoints, 1, 16)
|
||||
if err != nil {
|
||||
removeRoots(xlDirs)
|
||||
return nil, nil, err
|
||||
|
@ -607,9 +607,7 @@ func (h *healSequence) healItemsFromSourceCh() error {
|
||||
}
|
||||
|
||||
func (h *healSequence) healFromSourceCh() {
|
||||
if err := h.healItemsFromSourceCh(); err != nil {
|
||||
h.traverseAndHealDoneCh <- err
|
||||
}
|
||||
h.healItemsFromSourceCh()
|
||||
close(h.traverseAndHealDoneCh)
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,7 @@ import (
|
||||
// - delimiter if set should be equal to '/', otherwise the request is rejected.
|
||||
// - marker if set should have a common prefix with 'prefix' param, otherwise
|
||||
// the request is rejected.
|
||||
func validateListObjectsArgs(prefix, marker, delimiter, encodingType string, maxKeys int) APIErrorCode {
|
||||
func validateListObjectsArgs(marker, delimiter, encodingType string, maxKeys int) APIErrorCode {
|
||||
// Max keys cannot be negative.
|
||||
if maxKeys < 0 {
|
||||
return ErrInvalidMaxKeys
|
||||
@ -82,7 +82,7 @@ func (api objectAPIHandlers) ListBucketObjectVersionsHandler(w http.ResponseWrit
|
||||
}
|
||||
|
||||
// Validate the query params before beginning to serve the request.
|
||||
if s3Error := validateListObjectsArgs(prefix, marker, delimiter, encodingType, maxkeys); s3Error != ErrNone {
|
||||
if s3Error := validateListObjectsArgs(marker, delimiter, encodingType, maxkeys); s3Error != ErrNone {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
@ -164,7 +164,7 @@ func (api objectAPIHandlers) ListObjectsV2Handler(w http.ResponseWriter, r *http
|
||||
|
||||
// Validate the query params before beginning to serve the request.
|
||||
// fetch-owner is not validated since it is a boolean
|
||||
if s3Error := validateListObjectsArgs(prefix, token, delimiter, encodingType, maxKeys); s3Error != ErrNone {
|
||||
if s3Error := validateListObjectsArgs(token, delimiter, encodingType, maxKeys); s3Error != ErrNone {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
@ -241,7 +241,7 @@ func (api objectAPIHandlers) ListObjectsV1Handler(w http.ResponseWriter, r *http
|
||||
}
|
||||
|
||||
// Validate all the query params before beginning to serve the request.
|
||||
if s3Error := validateListObjectsArgs(prefix, marker, delimiter, encodingType, maxKeys); s3Error != ErrNone {
|
||||
if s3Error := validateListObjectsArgs(marker, delimiter, encodingType, maxKeys); s3Error != ErrNone {
|
||||
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(s3Error), r.URL, guessIsBrowserReq(r))
|
||||
return
|
||||
}
|
||||
|
@ -252,11 +252,11 @@ func handleCommonEnvVars() {
|
||||
|
||||
}
|
||||
|
||||
func logStartupMessage(msg string, data ...interface{}) {
|
||||
func logStartupMessage(msg string) {
|
||||
if globalConsoleSys != nil {
|
||||
globalConsoleSys.Send(msg, string(logger.All))
|
||||
}
|
||||
logger.StartupMessage(msg, data...)
|
||||
logger.StartupMessage(msg)
|
||||
}
|
||||
|
||||
func getTLSConfig() (x509Certs []*x509.Certificate, c *certs.Certs, secureConn bool, err error) {
|
||||
|
@ -222,7 +222,7 @@ func (c *diskCache) purge() {
|
||||
continue
|
||||
}
|
||||
|
||||
objInfo, err := c.statCache(ctx, pathJoin(c.dir, obj.Name()))
|
||||
objInfo, err := c.statCache(pathJoin(c.dir, obj.Name()))
|
||||
if err != nil {
|
||||
// delete any partially filled cache entry left behind.
|
||||
removeAll(pathJoin(c.dir, obj.Name()))
|
||||
@ -274,7 +274,7 @@ func (c *diskCache) IsOnline() bool {
|
||||
// Stat returns ObjectInfo from disk cache
|
||||
func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectInfo, err error) {
|
||||
cacheObjPath := getCacheSHADir(c.dir, bucket, object)
|
||||
oi, err = c.statCache(ctx, cacheObjPath)
|
||||
oi, err = c.statCache(cacheObjPath)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -288,7 +288,7 @@ func (c *diskCache) Stat(ctx context.Context, bucket, object string) (oi ObjectI
|
||||
}
|
||||
|
||||
// statCache is a convenience function for purge() to get ObjectInfo for cached object
|
||||
func (c *diskCache) statCache(ctx context.Context, cacheObjPath string) (oi ObjectInfo, e error) {
|
||||
func (c *diskCache) statCache(cacheObjPath string) (oi ObjectInfo, e error) {
|
||||
// Stat the file to get file size.
|
||||
metaPath := path.Join(cacheObjPath, cacheMetaJSONFile)
|
||||
f, err := os.Open(metaPath)
|
||||
@ -365,14 +365,10 @@ func getCacheSHADir(dir, bucket, object string) string {
|
||||
}
|
||||
|
||||
// Cache data to disk with bitrot checksum added for each block of 1MB
|
||||
func (c *diskCache) bitrotWriteToCache(ctx context.Context, cachePath string, reader io.Reader, size uint64) (int64, error) {
|
||||
func (c *diskCache) bitrotWriteToCache(cachePath string, reader io.Reader, size uint64) (int64, error) {
|
||||
if err := os.MkdirAll(cachePath, 0777); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
bufSize := uint64(readSizeV1)
|
||||
if size > 0 && bufSize > size {
|
||||
bufSize = size
|
||||
}
|
||||
filePath := path.Join(cachePath, cacheDataFile)
|
||||
|
||||
if filePath == "" || reader == nil {
|
||||
@ -474,11 +470,6 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
|
||||
if err := os.MkdirAll(cachePath, 0777); err != nil {
|
||||
return err
|
||||
}
|
||||
bufSize := int64(readSizeV1)
|
||||
if size > 0 && bufSize > size {
|
||||
bufSize = size
|
||||
}
|
||||
|
||||
var metadata = make(map[string]string)
|
||||
for k, v := range opts.UserDefined {
|
||||
metadata[k] = v
|
||||
@ -493,7 +484,7 @@ func (c *diskCache) Put(ctx context.Context, bucket, object string, data io.Read
|
||||
}
|
||||
actualSize, _ = sio.EncryptedSize(uint64(size))
|
||||
}
|
||||
n, err := c.bitrotWriteToCache(ctx, cachePath, reader, actualSize)
|
||||
n, err := c.bitrotWriteToCache(cachePath, reader, actualSize)
|
||||
if IsErr(err, baseErrs...) {
|
||||
c.setOnline(false)
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ func (c *cacheObjects) DeleteObject(ctx context.Context, bucket, object string)
|
||||
return
|
||||
}
|
||||
|
||||
dcache, cerr := c.getCacheLoc(ctx, bucket, object)
|
||||
dcache, cerr := c.getCacheLoc(bucket, object)
|
||||
if cerr != nil {
|
||||
return
|
||||
}
|
||||
@ -339,7 +339,7 @@ func (c *cacheObjects) isCacheExclude(bucket, object string) bool {
|
||||
// choose a cache deterministically based on hash of bucket,object. The hash index is treated as
|
||||
// a hint. In the event that the cache drive at hash index is offline, treat the list of cache drives
|
||||
// as a circular buffer and walk through them starting at hash index until an online drive is found.
|
||||
func (c *cacheObjects) getCacheLoc(ctx context.Context, bucket, object string) (*diskCache, error) {
|
||||
func (c *cacheObjects) getCacheLoc(bucket, object string) (*diskCache, error) {
|
||||
index := c.hashIndex(bucket, object)
|
||||
numDisks := len(c.cache)
|
||||
for k := 0; k < numDisks; k++ {
|
||||
|
@ -77,12 +77,11 @@ func TestGetCachedLoc(t *testing.T) {
|
||||
c := cacheObjects{cache: d}
|
||||
bucketName := "testbucket"
|
||||
objectName := "testobject"
|
||||
ctx := context.Background()
|
||||
// find cache drive where object would be hashed
|
||||
index := c.hashIndex(bucketName, objectName)
|
||||
// turn off drive by setting online status to false
|
||||
c.cache[index].online = false
|
||||
cfs, err := c.getCacheLoc(ctx, bucketName, objectName)
|
||||
cfs, err := c.getCacheLoc(bucketName, objectName)
|
||||
if n == 1 && err == errDiskNotFound {
|
||||
continue
|
||||
}
|
||||
@ -118,12 +117,11 @@ func TestGetCacheMaxUse(t *testing.T) {
|
||||
|
||||
bucketName := "testbucket"
|
||||
objectName := "testobject"
|
||||
ctx := context.Background()
|
||||
// find cache drive where object would be hashed
|
||||
index := c.hashIndex(bucketName, objectName)
|
||||
// turn off drive by setting online status to false
|
||||
c.cache[index].online = false
|
||||
cb, err := c.getCacheLoc(ctx, bucketName, objectName)
|
||||
cb, err := c.getCacheLoc(bucketName, objectName)
|
||||
if n == 1 && err == errDiskNotFound {
|
||||
continue
|
||||
}
|
||||
|
@ -374,10 +374,10 @@ func newDecryptReader(client io.Reader, key []byte, bucket, object string, seqNu
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newDecryptReaderWithObjectKey(client, objectEncryptionKey, seqNumber, metadata)
|
||||
return newDecryptReaderWithObjectKey(client, objectEncryptionKey, seqNumber)
|
||||
}
|
||||
|
||||
func newDecryptReaderWithObjectKey(client io.Reader, objectEncryptionKey []byte, seqNumber uint32, metadata map[string]string) (io.Reader, error) {
|
||||
func newDecryptReaderWithObjectKey(client io.Reader, objectEncryptionKey []byte, seqNumber uint32) (io.Reader, error) {
|
||||
reader, err := sio.DecryptReader(client, sio.Config{
|
||||
Key: objectEncryptionKey,
|
||||
SequenceNumber: seqNumber,
|
||||
@ -527,7 +527,7 @@ func (d *DecryptBlocksReader) buildDecrypter(partID int) error {
|
||||
// Limit the reader, so the decryptor doesnt receive bytes
|
||||
// from the next part (different DARE stream)
|
||||
encLenToRead := d.parts[d.partIndex].Size - d.partEncRelOffset
|
||||
decrypter, err := newDecryptReaderWithObjectKey(io.LimitReader(d.reader, encLenToRead), partEncryptionKey, d.startSeqNum, m)
|
||||
decrypter, err := newDecryptReaderWithObjectKey(io.LimitReader(d.reader, encLenToRead), partEncryptionKey, d.startSeqNum)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -579,52 +579,6 @@ func (d *DecryptBlocksReader) Read(p []byte) (int, error) {
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// getEncryptedMultipartsOffsetLength - fetch sequence number, encrypted start offset and encrypted length.
|
||||
func getEncryptedMultipartsOffsetLength(offset, length int64, obj ObjectInfo) (uint32, int64, int64) {
|
||||
// Calculate encrypted offset of a multipart object
|
||||
computeEncOffset := func(off int64, obj ObjectInfo) (seqNumber uint32, encryptedOffset int64, err error) {
|
||||
var curPartEndOffset uint64
|
||||
var prevPartsEncSize int64
|
||||
for _, p := range obj.Parts {
|
||||
size, decErr := sio.DecryptedSize(uint64(p.Size))
|
||||
if decErr != nil {
|
||||
err = errObjectTampered // assign correct error type
|
||||
return
|
||||
}
|
||||
if off < int64(curPartEndOffset+size) {
|
||||
seqNumber, encryptedOffset, _ = getEncryptedSinglePartOffsetLength(off-int64(curPartEndOffset), 1, obj)
|
||||
encryptedOffset += int64(prevPartsEncSize)
|
||||
break
|
||||
}
|
||||
curPartEndOffset += size
|
||||
prevPartsEncSize += p.Size
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Calculate the encrypted start offset corresponding to the plain offset
|
||||
seqNumber, encStartOffset, _ := computeEncOffset(offset, obj)
|
||||
// Calculate also the encrypted end offset corresponding to plain offset + plain length
|
||||
_, encEndOffset, _ := computeEncOffset(offset+length-1, obj)
|
||||
|
||||
// encLength is the diff between encrypted end offset and encrypted start offset + one package size
|
||||
// to ensure all encrypted data are covered
|
||||
encLength := encEndOffset - encStartOffset + (64*1024 + 32)
|
||||
|
||||
// Calculate total size of all parts
|
||||
var totalPartsLength int64
|
||||
for _, p := range obj.Parts {
|
||||
totalPartsLength += p.Size
|
||||
}
|
||||
|
||||
// Set encLength to maximum possible value if it exceeded total parts size
|
||||
if encLength+encStartOffset > totalPartsLength {
|
||||
encLength = totalPartsLength - encStartOffset
|
||||
}
|
||||
|
||||
return seqNumber, encStartOffset, encLength
|
||||
}
|
||||
|
||||
// getEncryptedSinglePartOffsetLength - fetch sequence number, encrypted start offset and encrypted length.
|
||||
func getEncryptedSinglePartOffsetLength(offset, length int64, objInfo ObjectInfo) (seqNumber uint32, encOffset int64, encLength int64) {
|
||||
onePkgSize := int64(SSEDAREPackageBlockSize + SSEDAREPackageMetaSize)
|
||||
|
@ -384,7 +384,7 @@ func migrateCacheData(ctx context.Context, c *diskCache, bucket, object, oldfile
|
||||
actualSize, _ = sio.EncryptedSize(uint64(st.Size()))
|
||||
}
|
||||
|
||||
_, err = c.bitrotWriteToCache(ctx, destDir, reader, uint64(actualSize))
|
||||
_, err = c.bitrotWriteToCache(destDir, reader, uint64(actualSize))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -160,7 +160,7 @@ func formatFSMigrate(ctx context.Context, wlk *lock.LockedFile, fsPath string) e
|
||||
}
|
||||
|
||||
// Creates a new format.json if unformatted.
|
||||
func createFormatFS(ctx context.Context, fsFormatPath string) error {
|
||||
func createFormatFS(fsFormatPath string) error {
|
||||
// Attempt a write lock on formatConfigFile `format.json`
|
||||
// file stored in minioMetaBucket(.minio.sys) directory.
|
||||
lk, err := lock.TryLockedOpenFile(fsFormatPath, os.O_RDWR|os.O_CREATE, 0600)
|
||||
@ -215,7 +215,7 @@ func initFormatFS(ctx context.Context, fsPath string) (rlk *lock.RLockedFile, er
|
||||
rlk.Close()
|
||||
}
|
||||
// Fresh disk - create format.json
|
||||
err = createFormatFS(ctx, fsFormatPath)
|
||||
err = createFormatFS(fsFormatPath)
|
||||
if err == lock.ErrAlreadyLocked {
|
||||
// Lock already present, sleep and attempt again.
|
||||
// Can happen in a rare situation when a parallel minio process
|
||||
|
@ -362,7 +362,6 @@ func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
|
||||
entries = append(entries, fi.Name())
|
||||
}
|
||||
}
|
||||
fis = nil
|
||||
return minio.FilterMatchingPrefix(entries, prefixEntry)
|
||||
}
|
||||
|
||||
|
@ -84,8 +84,6 @@ const (
|
||||
// GlobalServiceExecutionInterval - Executes the Lifecycle events.
|
||||
GlobalServiceExecutionInterval = time.Hour * 24 // 24 hrs.
|
||||
|
||||
// Refresh interval to update in-memory bucket lifecycle cache.
|
||||
globalRefreshBucketLifecycleInterval = 5 * time.Minute
|
||||
// Refresh interval to update in-memory iam config cache.
|
||||
globalRefreshIAMInterval = 5 * time.Minute
|
||||
|
||||
|
@ -375,11 +375,6 @@ func getResource(path string, host string, domains []string) (string, error) {
|
||||
return path, nil
|
||||
}
|
||||
|
||||
// If none of the http routes match respond with MethodNotAllowed, in JSON
|
||||
func notFoundHandlerJSON(w http.ResponseWriter, r *http.Request) {
|
||||
writeErrorResponseJSON(context.Background(), w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL)
|
||||
}
|
||||
|
||||
// If none of the http routes match respond with MethodNotAllowed
|
||||
func notFoundHandler(w http.ResponseWriter, r *http.Request) {
|
||||
writeErrorResponse(context.Background(), w, errorCodes.ToAPIErr(ErrMethodNotAllowed), r.URL, guessIsBrowserReq(r))
|
||||
|
@ -678,7 +678,7 @@ func (sys *IAMSys) SetUser(accessKey string, uinfo madmin.UserInfo) error {
|
||||
|
||||
// Set policy if specified.
|
||||
if uinfo.PolicyName != "" {
|
||||
return sys.policyDBSet(objectAPI, accessKey, uinfo.PolicyName, false, false)
|
||||
return sys.policyDBSet(accessKey, uinfo.PolicyName, false, false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -962,12 +962,12 @@ func (sys *IAMSys) PolicyDBSet(name, policy string, isGroup bool) error {
|
||||
|
||||
// isSTS is always false when called via PolicyDBSet as policy
|
||||
// is never set by an external API call for STS users.
|
||||
return sys.policyDBSet(objectAPI, name, policy, false, isGroup)
|
||||
return sys.policyDBSet(name, policy, false, isGroup)
|
||||
}
|
||||
|
||||
// policyDBSet - sets a policy for user in the policy db. Assumes that
|
||||
// caller has sys.Lock().
|
||||
func (sys *IAMSys) policyDBSet(objectAPI ObjectLayer, name, policy string, isSTS, isGroup bool) error {
|
||||
func (sys *IAMSys) policyDBSet(name, policy string, isSTS, isGroup bool) error {
|
||||
if name == "" || policy == "" {
|
||||
return errInvalidArgument
|
||||
}
|
||||
|
@ -169,7 +169,7 @@ func (n *nsLockMap) lock(ctx context.Context, volume, path string, lockSource, o
|
||||
}
|
||||
|
||||
// Unlock the namespace resource.
|
||||
func (n *nsLockMap) unlock(volume, path, opsID string, readLock bool) {
|
||||
func (n *nsLockMap) unlock(volume, path string, readLock bool) {
|
||||
param := nsParam{volume, path}
|
||||
n.lockMapMutex.RLock()
|
||||
nsLk, found := n.lockMap[param]
|
||||
@ -207,7 +207,7 @@ func (n *nsLockMap) Lock(volume, path, opsID string, timeout time.Duration) (loc
|
||||
// Unlock - unlocks any previously acquired write locks.
|
||||
func (n *nsLockMap) Unlock(volume, path, opsID string) {
|
||||
readLock := false
|
||||
n.unlock(volume, path, opsID, readLock)
|
||||
n.unlock(volume, path, readLock)
|
||||
}
|
||||
|
||||
// RLock - locks any previously acquired read locks.
|
||||
@ -221,7 +221,7 @@ func (n *nsLockMap) RLock(volume, path, opsID string, timeout time.Duration) (lo
|
||||
// RUnlock - unlocks any previously acquired read locks.
|
||||
func (n *nsLockMap) RUnlock(volume, path, opsID string) {
|
||||
readLock := true
|
||||
n.unlock(volume, path, opsID, readLock)
|
||||
n.unlock(volume, path, readLock)
|
||||
}
|
||||
|
||||
// ForceUnlock - forcefully unlock a lock based on name.
|
||||
@ -325,7 +325,7 @@ func (li *localLockInstance) GetLock(timeout *dynamicTimeout) (timedOutErr error
|
||||
// Unlock - block until write lock is released.
|
||||
func (li *localLockInstance) Unlock() {
|
||||
readLock := false
|
||||
li.ns.unlock(li.volume, li.path, li.opsID, readLock)
|
||||
li.ns.unlock(li.volume, li.path, readLock)
|
||||
}
|
||||
|
||||
// RLock - block until read lock is taken or timeout has occurred.
|
||||
@ -344,7 +344,7 @@ func (li *localLockInstance) GetRLock(timeout *dynamicTimeout) (timedOutErr erro
|
||||
// RUnlock - block until read lock is released.
|
||||
func (li *localLockInstance) RUnlock() {
|
||||
readLock := true
|
||||
li.ns.unlock(li.volume, li.path, li.opsID, readLock)
|
||||
li.ns.unlock(li.volume, li.path, readLock)
|
||||
}
|
||||
|
||||
func getSource() string {
|
||||
|
@ -1076,10 +1076,7 @@ func (sys *NotificationSys) CPUInfo() []madmin.ServerCPUHardwareInfo {
|
||||
func NewNotificationSys(config *serverConfig, endpoints EndpointList) *NotificationSys {
|
||||
targetList := getNotificationTargets(config)
|
||||
remoteHosts := getRemoteHosts(endpoints)
|
||||
remoteClients, err := getRestClients(remoteHosts)
|
||||
if err != nil {
|
||||
logger.FatalIf(err, "Unable to start notification sub system")
|
||||
}
|
||||
remoteClients := getRestClients(remoteHosts)
|
||||
|
||||
// bucketRulesMap/bucketRemoteTargetRulesMap are initialized by NotificationSys.Init()
|
||||
return &NotificationSys{
|
||||
|
@ -147,7 +147,7 @@ func cleanupDir(ctx context.Context, storage StorageAPI, volume, dirPath string)
|
||||
}
|
||||
|
||||
// Cleanup objects in bulk and recursively: each object will have a list of sub-files to delete in the backend
|
||||
func cleanupObjectsBulk(ctx context.Context, storage StorageAPI, volume string, objsPaths []string, errs []error) ([]error, error) {
|
||||
func cleanupObjectsBulk(storage StorageAPI, volume string, objsPaths []string, errs []error) ([]error, error) {
|
||||
// The list of files in disk to delete
|
||||
var filesToDelete []string
|
||||
// Map files to delete to the passed objsPaths
|
||||
@ -236,7 +236,7 @@ func removeListenerConfig(ctx context.Context, objAPI ObjectLayer, bucket string
|
||||
return objAPI.DeleteObject(ctx, minioMetaBucket, lcPath)
|
||||
}
|
||||
|
||||
func listObjectsNonSlash(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||
func listObjectsNonSlash(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||
endWalkCh := make(chan struct{})
|
||||
defer close(endWalkCh)
|
||||
recursive := true
|
||||
@ -321,7 +321,7 @@ func listObjectsNonSlash(ctx context.Context, obj ObjectLayer, bucket, prefix, m
|
||||
|
||||
func listObjects(ctx context.Context, obj ObjectLayer, bucket, prefix, marker, delimiter string, maxKeys int, tpool *TreeWalkPool, listDir ListDirFunc, getObjInfo func(context.Context, string, string) (ObjectInfo, error), getObjectInfoDirs ...func(context.Context, string, string) (ObjectInfo, error)) (loi ListObjectsInfo, err error) {
|
||||
if delimiter != SlashSeparator && delimiter != "" {
|
||||
return listObjectsNonSlash(ctx, obj, bucket, prefix, marker, delimiter, maxKeys, tpool, listDir, getObjInfo, getObjectInfoDirs...)
|
||||
return listObjectsNonSlash(ctx, bucket, prefix, marker, delimiter, maxKeys, tpool, listDir, getObjInfo, getObjectInfoDirs...)
|
||||
}
|
||||
|
||||
if err := checkListObjsArgs(ctx, bucket, prefix, marker, delimiter, obj); err != nil {
|
||||
|
@ -2132,7 +2132,7 @@ func (w *whiteSpaceWriter) WriteHeader(statusCode int) {
|
||||
// we do background append as and when the parts arrive and completeMultiPartUpload
|
||||
// is quick. Only in a rare case where parts would be out of order will
|
||||
// FS:completeMultiPartUpload() take a longer time.
|
||||
func sendWhiteSpace(ctx context.Context, w http.ResponseWriter) <-chan bool {
|
||||
func sendWhiteSpace(w http.ResponseWriter) <-chan bool {
|
||||
doneCh := make(chan bool)
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second * 10)
|
||||
@ -2302,7 +2302,7 @@ func (api objectAPIHandlers) CompleteMultipartUploadHandler(w http.ResponseWrite
|
||||
}
|
||||
w.Header().Set(xhttp.ContentType, "text/event-stream")
|
||||
w = &whiteSpaceWriter{ResponseWriter: w, Flusher: w.(http.Flusher)}
|
||||
completeDoneCh := sendWhiteSpace(ctx, w)
|
||||
completeDoneCh := sendWhiteSpace(w)
|
||||
objInfo, err := completeMultiPartUpload(ctx, bucket, object, uploadID, completeParts, opts)
|
||||
// Stop writing white spaces to the client. Note that close(doneCh) style is not used as it
|
||||
// can cause white space to be written after we send XML response in a race condition.
|
||||
|
@ -47,9 +47,8 @@ type peerRESTClient struct {
|
||||
}
|
||||
|
||||
// Reconnect to a peer rest server.
|
||||
func (client *peerRESTClient) reConnect() error {
|
||||
func (client *peerRESTClient) reConnect() {
|
||||
atomic.StoreInt32(&client.connected, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Wrapper to restClient.Call to handle network errors, in case of network error the connection is marked disconnected
|
||||
@ -64,11 +63,7 @@ func (client *peerRESTClient) call(method string, values url.Values, body io.Rea
|
||||
// after verifying format.json
|
||||
func (client *peerRESTClient) callWithContext(ctx context.Context, method string, values url.Values, body io.Reader, length int64) (respBody io.ReadCloser, err error) {
|
||||
if !client.IsOnline() {
|
||||
err := client.reConnect()
|
||||
logger.LogIf(ctx, err)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
client.reConnect()
|
||||
}
|
||||
|
||||
if values == nil {
|
||||
@ -694,7 +689,7 @@ func getRemoteHosts(endpoints EndpointList) []*xnet.Host {
|
||||
return remoteHosts
|
||||
}
|
||||
|
||||
func getRestClients(peerHosts []*xnet.Host) ([]*peerRESTClient, error) {
|
||||
func getRestClients(peerHosts []*xnet.Host) []*peerRESTClient {
|
||||
restClients := make([]*peerRESTClient, len(peerHosts))
|
||||
for i, host := range peerHosts {
|
||||
client, err := newPeerRESTClient(host)
|
||||
@ -704,7 +699,7 @@ func getRestClients(peerHosts []*xnet.Host) ([]*peerRESTClient, error) {
|
||||
restClients[i] = client
|
||||
}
|
||||
|
||||
return restClients, nil
|
||||
return restClients
|
||||
}
|
||||
|
||||
// Returns a peer rest client.
|
||||
|
@ -240,7 +240,7 @@ func checkPostPolicy(formValues http.Header, postPolicyForm PostPolicyForm) erro
|
||||
}
|
||||
|
||||
// Flag to indicate if all policies conditions are satisfied
|
||||
condPassed := true
|
||||
var condPassed bool
|
||||
|
||||
// Iterate over policy conditions and check them against received form fields
|
||||
for _, policy := range postPolicyForm.Conditions.Policies {
|
||||
|
@ -278,7 +278,7 @@ func connectLoadInitFormats(retryCount int, firstDisk bool, endpoints EndpointLi
|
||||
}
|
||||
|
||||
// Format disks before initialization of object layer.
|
||||
func waitForFormatXL(ctx context.Context, firstDisk bool, endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) {
|
||||
func waitForFormatXL(firstDisk bool, endpoints EndpointList, setCount, disksPerSet int) (format *formatXLV3, err error) {
|
||||
if len(endpoints) == 0 || setCount == 0 || disksPerSet == 0 {
|
||||
return nil, errInvalidArgument
|
||||
}
|
||||
|
@ -414,7 +414,7 @@ func newObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err error) {
|
||||
return NewFSObjectLayer(endpoints[0].Path)
|
||||
}
|
||||
|
||||
format, err := waitForFormatXL(context.Background(), endpoints[0].IsLocal, endpoints, globalXLSetCount, globalXLSetDriveCount)
|
||||
format, err := waitForFormatXL(endpoints[0].IsLocal, endpoints, globalXLSetCount, globalXLSetDriveCount)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -101,9 +101,6 @@ func unescapeQueries(encodedQuery string) (unescapedQueries []string, err error)
|
||||
// - http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html#RESTAuthenticationQueryStringAuth
|
||||
// returns ErrNone if matches. S3 errors otherwise.
|
||||
func doesPresignV2SignatureMatch(r *http.Request) APIErrorCode {
|
||||
// Access credentials.
|
||||
cred := globalServerConfig.GetCredential()
|
||||
|
||||
// r.RequestURI will have raw encoded URI as sent by the client.
|
||||
tokens := strings.SplitN(r.RequestURI, "?", 2)
|
||||
encodedResource := tokens[0]
|
||||
|
@ -143,13 +143,3 @@ var stsErrCodes = stsErrorCodeMap{
|
||||
HTTPStatusCode: http.StatusInternalServerError,
|
||||
},
|
||||
}
|
||||
|
||||
// getSTSErrorResponse gets in standard error and
|
||||
// provides a encodable populated response values
|
||||
func getSTSErrorResponse(err STSError, requestID string) STSErrorResponse {
|
||||
errRsp := STSErrorResponse{}
|
||||
errRsp.Error.Code = err.Code
|
||||
errRsp.Error.Message = err.Description
|
||||
errRsp.RequestID = requestID
|
||||
return errRsp
|
||||
}
|
||||
|
@ -190,7 +190,7 @@ func prepareXLSets32() (ObjectLayer, []string, error) {
|
||||
|
||||
endpoints := append(endpoints1, endpoints2...)
|
||||
fsDirs := append(fsDirs1, fsDirs2...)
|
||||
format, err := waitForFormatXL(context.Background(), true, endpoints, 2, 16)
|
||||
format, err := waitForFormatXL(true, endpoints, 2, 16)
|
||||
if err != nil {
|
||||
removeRoots(fsDirs)
|
||||
return nil, nil, err
|
||||
@ -1612,7 +1612,7 @@ func newTestObjectLayer(endpoints EndpointList) (newObject ObjectLayer, err erro
|
||||
return NewFSObjectLayer(endpoints[0].Path)
|
||||
}
|
||||
|
||||
_, err = waitForFormatXL(context.Background(), endpoints[0].IsLocal, endpoints, 1, 16)
|
||||
_, err = waitForFormatXL(endpoints[0].IsLocal, endpoints, 1, 16)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -17,7 +17,6 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@ -77,18 +76,18 @@ func TestNewXLSets(t *testing.T) {
|
||||
}
|
||||
|
||||
endpoints := mustGetNewEndpointList(erasureDisks...)
|
||||
_, err := waitForFormatXL(context.Background(), true, endpoints, 0, 16)
|
||||
_, err := waitForFormatXL(true, endpoints, 0, 16)
|
||||
if err != errInvalidArgument {
|
||||
t.Fatalf("Expecting error, got %s", err)
|
||||
}
|
||||
|
||||
_, err = waitForFormatXL(context.Background(), true, nil, 1, 16)
|
||||
_, err = waitForFormatXL(true, nil, 1, 16)
|
||||
if err != errInvalidArgument {
|
||||
t.Fatalf("Expecting error, got %s", err)
|
||||
}
|
||||
|
||||
// Initializes all erasure disks
|
||||
format, err := waitForFormatXL(context.Background(), true, endpoints, 1, 16)
|
||||
format, err := waitForFormatXL(true, endpoints, 1, 16)
|
||||
if err != nil {
|
||||
t.Fatalf("Unable to format disks for erasure, %s", err)
|
||||
}
|
||||
|
@ -806,7 +806,7 @@ func (xl xlObjects) doDeleteObjects(ctx context.Context, bucket string, objects
|
||||
opErrs[index] = errDiskNotFound
|
||||
continue
|
||||
}
|
||||
delObjErrs[index], opErrs[index] = cleanupObjectsBulk(ctx, disk, minioMetaTmpBucket, tmpObjs, errs)
|
||||
delObjErrs[index], opErrs[index] = cleanupObjectsBulk(disk, minioMetaTmpBucket, tmpObjs, errs)
|
||||
if opErrs[index] == errVolumeNotFound {
|
||||
opErrs[index] = nil
|
||||
}
|
||||
@ -940,7 +940,6 @@ func (xl xlObjects) DeleteObjects(ctx context.Context, bucket string, objects []
|
||||
// Avoid to increase the index if object
|
||||
// name is found to be duplicated.
|
||||
start = i
|
||||
end = i
|
||||
} else {
|
||||
i++
|
||||
}
|
||||
|
@ -39,7 +39,7 @@ var ErrNoEntriesFound = errors.New("No entries found for this key")
|
||||
const etcdPathSeparator = "/"
|
||||
|
||||
// create a new coredns service record for the bucket.
|
||||
func newCoreDNSMsg(bucket, ip string, port int, ttl uint32) ([]byte, error) {
|
||||
func newCoreDNSMsg(ip string, port int, ttl uint32) ([]byte, error) {
|
||||
return json.Marshal(&SrvRecord{
|
||||
Host: ip,
|
||||
Port: port,
|
||||
@ -156,7 +156,7 @@ func (c *coreDNS) list(key string) ([]SrvRecord, error) {
|
||||
// Adds DNS entries into etcd endpoint in CoreDNS etcd message format.
|
||||
func (c *coreDNS) Put(bucket string) error {
|
||||
for ip := range c.domainIPs {
|
||||
bucketMsg, err := newCoreDNSMsg(bucket, ip, c.domainPort, defaultTTL)
|
||||
bucketMsg, err := newCoreDNSMsg(ip, c.domainPort, defaultTTL)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -87,7 +87,7 @@ func (store *QueueStore) Open() error {
|
||||
}
|
||||
|
||||
// write - writes event to the directory.
|
||||
func (store *QueueStore) write(directory string, key string, e event.Event) error {
|
||||
func (store *QueueStore) write(key string, e event.Event) error {
|
||||
|
||||
// Marshalls the event.
|
||||
eventData, err := json.Marshal(e)
|
||||
@ -117,7 +117,7 @@ func (store *QueueStore) Put(e event.Event) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return store.write(store.directory, key, e)
|
||||
return store.write(key, e)
|
||||
}
|
||||
|
||||
// Get - gets a event from the store.
|
||||
|
@ -19,7 +19,6 @@ package madmin
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@ -29,14 +28,12 @@ import (
|
||||
|
||||
// ServiceRestart - restarts the MinIO cluster
|
||||
func (adm *AdminClient) ServiceRestart() error {
|
||||
_, err := adm.serviceCallAction(ServiceActionRestart)
|
||||
return err
|
||||
return adm.serviceCallAction(ServiceActionRestart)
|
||||
}
|
||||
|
||||
// ServiceStop - stops the MinIO cluster
|
||||
func (adm *AdminClient) ServiceStop() error {
|
||||
_, err := adm.serviceCallAction(ServiceActionStop)
|
||||
return err
|
||||
return adm.serviceCallAction(ServiceActionStop)
|
||||
}
|
||||
|
||||
// ServiceAction - type to restrict service-action values
|
||||
@ -50,7 +47,7 @@ const (
|
||||
)
|
||||
|
||||
// serviceCallAction - call service restart/update/stop API.
|
||||
func (adm *AdminClient) serviceCallAction(action ServiceAction) ([]byte, error) {
|
||||
func (adm *AdminClient) serviceCallAction(action ServiceAction) error {
|
||||
queryValues := url.Values{}
|
||||
queryValues.Set("action", string(action))
|
||||
|
||||
@ -61,14 +58,14 @@ func (adm *AdminClient) serviceCallAction(action ServiceAction) ([]byte, error)
|
||||
})
|
||||
defer closeResponse(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, httpRespToErrorResponse(resp)
|
||||
return httpRespToErrorResponse(resp)
|
||||
}
|
||||
|
||||
return ioutil.ReadAll(resp.Body)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ServiceTraceInfo holds http trace
|
||||
|
@ -119,7 +119,6 @@ func ParseHost(s string) (*Host, error) {
|
||||
return nil, err
|
||||
}
|
||||
host = s
|
||||
portStr = ""
|
||||
} else {
|
||||
if port, err = ParsePort(portStr); err != nil {
|
||||
return nil, err
|
||||
|
@ -150,7 +150,7 @@ const csvSplitSize = 128 << 10
|
||||
// startReaders will read the header if needed and spin up a parser
|
||||
// and a number of workers based on GOMAXPROCS.
|
||||
// If an error is returned no goroutines have been started and r.err will have been set.
|
||||
func (r *Reader) startReaders(in io.Reader, newReader func(io.Reader) *csv.Reader) error {
|
||||
func (r *Reader) startReaders(newReader func(io.Reader) *csv.Reader) error {
|
||||
if r.args.FileHeaderInfo != none {
|
||||
// Read column names
|
||||
// Get one line.
|
||||
@ -304,5 +304,5 @@ func NewReader(readCloser io.ReadCloser, args *ReaderArgs) (*Reader, error) {
|
||||
return ret
|
||||
}
|
||||
|
||||
return r, r.startReaders(csvIn, newCsvReader)
|
||||
return r, r.startReaders(newCsvReader)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user