mirror of
				https://github.com/minio/minio.git
				synced 2025-10-29 15:55:00 -04:00 
			
		
		
		
	Memory now evicts bucket if no more objects in memory struct
- To avoid race in expiration while accessing memory driver structs with in two competing write locks. Use lru Len() instead to know exact length to schedule for eviction. - squash both bucket and object structs, instead use a separate map to keep mutable info of lastAccessTime which can be independently used inside the expiration routine.
This commit is contained in:
		
							parent
							
								
									63ba97fd8e
								
							
						
					
					
						commit
						f8a16dd22b
					
				| @ -19,6 +19,9 @@ package server | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"reflect" | ||||
| 	"time" | ||||
| 
 | ||||
| 	"github.com/minio-io/minio/pkg/api" | ||||
| 	"github.com/minio-io/minio/pkg/api/web" | ||||
| 	"github.com/minio-io/minio/pkg/iodine" | ||||
| @ -26,8 +29,6 @@ import ( | ||||
| 	"github.com/minio-io/minio/pkg/storage/drivers/donut" | ||||
| 	"github.com/minio-io/minio/pkg/storage/drivers/memory" | ||||
| 	"github.com/minio-io/minio/pkg/utils/log" | ||||
| 	"reflect" | ||||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| // MemoryFactory is used to build memory api servers | ||||
|  | ||||
| @ -40,25 +40,19 @@ import ( | ||||
| 
 | ||||
| // memoryDriver - local variables | ||||
| type memoryDriver struct { | ||||
| 	bucketMetadata map[string]storedBucket | ||||
| 	objectMetadata map[string]storedObject | ||||
| 	objects        *lru.Cache | ||||
| 	lock           *sync.RWMutex | ||||
| 	totalSize      uint64 | ||||
| 	maxSize        uint64 | ||||
| 	expiration     time.Duration | ||||
| 	shutdown       bool | ||||
| 	storedBuckets       map[string]storedBucket | ||||
| 	lock                *sync.RWMutex | ||||
| 	objects             *lru.Cache | ||||
| 	lastAccessedObjects map[string]time.Time | ||||
| 	totalSize           uint64 | ||||
| 	maxSize             uint64 | ||||
| 	expiration          time.Duration | ||||
| 	shutdown            bool | ||||
| } | ||||
| 
 | ||||
| type storedBucket struct { | ||||
| 	metadata drivers.BucketMetadata | ||||
| 	//	owner    string // TODO | ||||
| 	//	id       string // TODO | ||||
| } | ||||
| 
 | ||||
| type storedObject struct { | ||||
| 	metadata     drivers.ObjectMetadata | ||||
| 	lastAccessed time.Time | ||||
| 	bucketMetadata drivers.BucketMetadata | ||||
| 	objectMetadata map[string]drivers.ObjectMetadata | ||||
| } | ||||
| 
 | ||||
| const ( | ||||
| @ -72,8 +66,8 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro | ||||
| 
 | ||||
| 	var memory *memoryDriver | ||||
| 	memory = new(memoryDriver) | ||||
| 	memory.bucketMetadata = make(map[string]storedBucket) | ||||
| 	memory.objectMetadata = make(map[string]storedObject) | ||||
| 	memory.storedBuckets = make(map[string]storedBucket) | ||||
| 	memory.lastAccessedObjects = make(map[string]time.Time) | ||||
| 	memory.objects = lru.New(0) | ||||
| 	memory.lock = new(sync.RWMutex) | ||||
| 	memory.expiration = expiration | ||||
| @ -91,7 +85,7 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro | ||||
| 	memory.objects.OnEvicted = memory.evictObject | ||||
| 
 | ||||
| 	// set up memory expiration | ||||
| 	go memory.expireObjects() | ||||
| 	go memory.expireLRUObjects() | ||||
| 
 | ||||
| 	go start(ctrlChannel, errorChannel) | ||||
| 	return ctrlChannel, errorChannel, memory | ||||
| @ -111,12 +105,13 @@ func (memory *memoryDriver) GetObject(w io.Writer, bucket string, object string) | ||||
| 	if !drivers.IsValidObject(object) { | ||||
| 		return 0, iodine.New(drivers.ObjectNameInvalid{Object: object}, nil) | ||||
| 	} | ||||
| 	if _, ok := memory.bucketMetadata[bucket]; ok == false { | ||||
| 	if _, ok := memory.storedBuckets[bucket]; ok == false { | ||||
| 		return 0, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) | ||||
| 	} | ||||
| 	// get object | ||||
| 	storedBucket := memory.storedBuckets[bucket] | ||||
| 	// form objectKey | ||||
| 	objectKey := bucket + "/" + object | ||||
| 	if _, ok := memory.objectMetadata[objectKey]; ok { | ||||
| 	if _, ok := storedBucket.objectMetadata[objectKey]; ok { | ||||
| 		if data, ok := memory.objects.Get(objectKey); ok { | ||||
| 			dataSlice := data.([]byte) | ||||
| 			objectBuffer := bytes.NewBuffer(dataSlice) | ||||
| @ -155,10 +150,10 @@ func (memory *memoryDriver) GetBucketMetadata(bucket string) (drivers.BucketMeta | ||||
| 	if !drivers.IsValidBucket(bucket) { | ||||
| 		return drivers.BucketMetadata{}, iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) | ||||
| 	} | ||||
| 	if _, ok := memory.bucketMetadata[bucket]; ok == false { | ||||
| 	if _, ok := memory.storedBuckets[bucket]; ok == false { | ||||
| 		return drivers.BucketMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) | ||||
| 	} | ||||
| 	return memory.bucketMetadata[bucket].metadata, nil | ||||
| 	return memory.storedBuckets[bucket].bucketMetadata, nil | ||||
| } | ||||
| 
 | ||||
| // SetBucketMetadata - | ||||
| @ -168,7 +163,7 @@ func (memory *memoryDriver) SetBucketMetadata(bucket, acl string) error { | ||||
| 		memory.lock.RUnlock() | ||||
| 		return iodine.New(drivers.BucketNameInvalid{Bucket: bucket}, nil) | ||||
| 	} | ||||
| 	if _, ok := memory.bucketMetadata[bucket]; ok == false { | ||||
| 	if _, ok := memory.storedBuckets[bucket]; ok == false { | ||||
| 		memory.lock.RUnlock() | ||||
| 		return iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) | ||||
| 	} | ||||
| @ -178,9 +173,9 @@ func (memory *memoryDriver) SetBucketMetadata(bucket, acl string) error { | ||||
| 	memory.lock.RUnlock() | ||||
| 	memory.lock.Lock() | ||||
| 	defer memory.lock.Unlock() | ||||
| 	storedBucket := memory.bucketMetadata[bucket] | ||||
| 	storedBucket.metadata.ACL = drivers.BucketACL(acl) | ||||
| 	memory.bucketMetadata[bucket] = storedBucket | ||||
| 	storedBucket := memory.storedBuckets[bucket] | ||||
| 	storedBucket.bucketMetadata.ACL = drivers.BucketACL(acl) | ||||
| 	memory.storedBuckets[bucket] = storedBucket | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| @ -214,12 +209,14 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su | ||||
| 		memory.lock.RUnlock() | ||||
| 		return iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) | ||||
| 	} | ||||
| 	if _, ok := memory.bucketMetadata[bucket]; ok == false { | ||||
| 	if _, ok := memory.storedBuckets[bucket]; ok == false { | ||||
| 		memory.lock.RUnlock() | ||||
| 		return iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) | ||||
| 	} | ||||
| 	storedBucket := memory.storedBuckets[bucket] | ||||
| 	// get object key | ||||
| 	objectKey := bucket + "/" + key | ||||
| 	if _, ok := memory.objectMetadata[objectKey]; ok == true { | ||||
| 	if _, ok := storedBucket.objectMetadata[objectKey]; ok == true { | ||||
| 		memory.lock.RUnlock() | ||||
| 		return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) | ||||
| 	} | ||||
| @ -228,9 +225,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su | ||||
| 	if contentType == "" { | ||||
| 		contentType = "application/octet-stream" | ||||
| 	} | ||||
| 
 | ||||
| 	contentType = strings.TrimSpace(contentType) | ||||
| 
 | ||||
| 	if strings.TrimSpace(expectedMD5Sum) != "" { | ||||
| 		expectedMD5SumBytes, err := base64.StdEncoding.DecodeString(strings.TrimSpace(expectedMD5Sum)) | ||||
| 		if err != nil { | ||||
| @ -241,7 +236,6 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su | ||||
| 	} | ||||
| 
 | ||||
| 	var bytesBuffer bytes.Buffer | ||||
| 	var newObject = storedObject{} | ||||
| 
 | ||||
| 	chunks := split.Stream(data, 10*1024*1024) | ||||
| 	totalLength := 0 | ||||
| @ -266,11 +260,13 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su | ||||
| 	// Verify if the written object is equal to what is expected, only if it is requested as such | ||||
| 	if strings.TrimSpace(expectedMD5Sum) != "" { | ||||
| 		if err := isMD5SumEqual(strings.TrimSpace(expectedMD5Sum), md5Sum); err != nil { | ||||
| 			memory.lock.Lock() | ||||
| 			defer memory.lock.Unlock() | ||||
| 			memory.objects.RemoveOldest() | ||||
| 			return iodine.New(drivers.BadDigest{Md5: expectedMD5Sum, Bucket: bucket, Key: key}, nil) | ||||
| 		} | ||||
| 	} | ||||
| 	newObject.metadata = drivers.ObjectMetadata{ | ||||
| 	newObject := drivers.ObjectMetadata{ | ||||
| 		Bucket: bucket, | ||||
| 		Key:    key, | ||||
| 
 | ||||
| @ -279,20 +275,22 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su | ||||
| 		Md5:         md5Sum, | ||||
| 		Size:        int64(totalLength), | ||||
| 	} | ||||
| 	newObject.lastAccessed = time.Now() | ||||
| 	memory.lock.Lock() | ||||
| 	if _, ok := memory.objectMetadata[objectKey]; ok == true { | ||||
| 		memory.objects.RemoveOldest() | ||||
| 		memory.lock.Unlock() | ||||
| 		return iodine.New(drivers.ObjectExists{Bucket: bucket, Object: key}, nil) | ||||
| 	memoryObject := make(map[string]drivers.ObjectMetadata) | ||||
| 	if len(memory.storedBuckets[bucket].objectMetadata) == 0 { | ||||
| 		storedBucket.objectMetadata = memoryObject | ||||
| 		storedBucket.objectMetadata[objectKey] = newObject | ||||
| 	} else { | ||||
| 		storedBucket.objectMetadata[objectKey] = newObject | ||||
| 	} | ||||
| 	memory.objectMetadata[objectKey] = newObject | ||||
| 	memory.storedBuckets[bucket] = storedBucket | ||||
| 	memory.objects.Add(objectKey, bytesBuffer.Bytes()) | ||||
| 	memory.totalSize = memory.totalSize + uint64(newObject.metadata.Size) | ||||
| 	for memory.totalSize > memory.maxSize { | ||||
| 	memory.totalSize = memory.totalSize + uint64(newObject.Size) | ||||
| 	if memory.totalSize > memory.maxSize { | ||||
| 		memory.objects.RemoveOldest() | ||||
| 	} | ||||
| 	memory.lock.Unlock() | ||||
| 	// free memory if possible for kernel to reclaim | ||||
| 	debug.FreeOSMemory() | ||||
| 	return nil | ||||
| } | ||||
| @ -300,7 +298,7 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su | ||||
| // CreateBucket - create bucket in memory | ||||
| func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { | ||||
| 	memory.lock.RLock() | ||||
| 	if len(memory.bucketMetadata) == totalBuckets { | ||||
| 	if len(memory.storedBuckets) == totalBuckets { | ||||
| 		memory.lock.RLock() | ||||
| 		return iodine.New(drivers.TooManyBuckets{Bucket: bucketName}, nil) | ||||
| 	} | ||||
| @ -312,7 +310,7 @@ func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { | ||||
| 		memory.lock.RUnlock() | ||||
| 		return iodine.New(drivers.InvalidACL{ACL: acl}, nil) | ||||
| 	} | ||||
| 	if _, ok := memory.bucketMetadata[bucketName]; ok == true { | ||||
| 	if _, ok := memory.storedBuckets[bucketName]; ok == true { | ||||
| 		memory.lock.RUnlock() | ||||
| 		return iodine.New(drivers.BucketExists{Bucket: bucketName}, nil) | ||||
| 	} | ||||
| @ -323,14 +321,14 @@ func (memory *memoryDriver) CreateBucket(bucketName, acl string) error { | ||||
| 		acl = "private" | ||||
| 	} | ||||
| 	var newBucket = storedBucket{} | ||||
| 	newBucket.metadata = drivers.BucketMetadata{} | ||||
| 	newBucket.metadata.Name = bucketName | ||||
| 	newBucket.metadata.Created = time.Now() | ||||
| 	newBucket.metadata.ACL = drivers.BucketACL(acl) | ||||
| 	newBucket.objectMetadata = make(map[string]drivers.ObjectMetadata) | ||||
| 	newBucket.bucketMetadata = drivers.BucketMetadata{} | ||||
| 	newBucket.bucketMetadata.Name = bucketName | ||||
| 	newBucket.bucketMetadata.Created = time.Now() | ||||
| 	newBucket.bucketMetadata.ACL = drivers.BucketACL(acl) | ||||
| 	memory.lock.Lock() | ||||
| 	defer memory.lock.Unlock() | ||||
| 	memory.bucketMetadata[bucketName] = newBucket | ||||
| 
 | ||||
| 	memory.storedBuckets[bucketName] = newBucket | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| @ -403,12 +401,13 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR | ||||
| 	if !drivers.IsValidObject(resources.Prefix) { | ||||
| 		return nil, drivers.BucketResourcesMetadata{IsTruncated: false}, iodine.New(drivers.ObjectNameInvalid{Object: resources.Prefix}, nil) | ||||
| 	} | ||||
| 	if _, ok := memory.bucketMetadata[bucket]; ok == false { | ||||
| 	if _, ok := memory.storedBuckets[bucket]; ok == false { | ||||
| 		return nil, drivers.BucketResourcesMetadata{IsTruncated: false}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) | ||||
| 	} | ||||
| 	var results []drivers.ObjectMetadata | ||||
| 	var keys []string | ||||
| 	for key := range memory.objectMetadata { | ||||
| 	storedBucket := memory.storedBuckets[bucket] | ||||
| 	for key := range storedBucket.objectMetadata { | ||||
| 		if strings.HasPrefix(key, bucket+"/") { | ||||
| 			key = key[len(bucket)+1:] | ||||
| 			keys, resources = memory.listObjectsInternal(keys, key, resources) | ||||
| @ -420,10 +419,8 @@ func (memory *memoryDriver) ListObjects(bucket string, resources drivers.BucketR | ||||
| 			resources.IsTruncated = true | ||||
| 			return results, resources, nil | ||||
| 		} | ||||
| 		object := memory.objectMetadata[bucket+"/"+key] | ||||
| 		if bucket == object.metadata.Bucket { | ||||
| 			results = append(results, object.metadata) | ||||
| 		} | ||||
| 		object := storedBucket.objectMetadata[bucket+"/"+key] | ||||
| 		results = append(results, object) | ||||
| 	} | ||||
| 	return results, resources, nil | ||||
| } | ||||
| @ -445,8 +442,8 @@ func (memory *memoryDriver) ListBuckets() ([]drivers.BucketMetadata, error) { | ||||
| 	memory.lock.RLock() | ||||
| 	defer memory.lock.RUnlock() | ||||
| 	var results []drivers.BucketMetadata | ||||
| 	for _, bucket := range memory.bucketMetadata { | ||||
| 		results = append(results, bucket.metadata) | ||||
| 	for _, bucket := range memory.storedBuckets { | ||||
| 		results = append(results, bucket.bucketMetadata) | ||||
| 	} | ||||
| 	sort.Sort(ByBucketName(results)) | ||||
| 	return results, nil | ||||
| @ -463,38 +460,48 @@ func (memory *memoryDriver) GetObjectMetadata(bucket, key, prefix string) (drive | ||||
| 	if !drivers.IsValidObject(key) || !drivers.IsValidObject(prefix) { | ||||
| 		return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNameInvalid{Object: key}, nil) | ||||
| 	} | ||||
| 	if _, ok := memory.bucketMetadata[bucket]; ok == false { | ||||
| 	if _, ok := memory.storedBuckets[bucket]; ok == false { | ||||
| 		return drivers.ObjectMetadata{}, iodine.New(drivers.BucketNotFound{Bucket: bucket}, nil) | ||||
| 	} | ||||
| 	storedBucket := memory.storedBuckets[bucket] | ||||
| 	objectKey := bucket + "/" + key | ||||
| 	if object, ok := memory.objectMetadata[objectKey]; ok == true { | ||||
| 		return object.metadata, nil | ||||
| 	if object, ok := storedBucket.objectMetadata[objectKey]; ok == true { | ||||
| 		return object, nil | ||||
| 	} | ||||
| 	return drivers.ObjectMetadata{}, iodine.New(drivers.ObjectNotFound{Bucket: bucket, Object: key}, nil) | ||||
| } | ||||
| 
 | ||||
| func (memory *memoryDriver) evictObject(key lru.Key, value interface{}) { | ||||
| 	k := key.(string) | ||||
| 	memory.totalSize = memory.totalSize - uint64(memory.objectMetadata[k].metadata.Size) | ||||
| 	delete(memory.objectMetadata, k) | ||||
| 	// loop through all buckets | ||||
| 	for bucket, storedBucket := range memory.storedBuckets { | ||||
| 		memory.totalSize = memory.totalSize - uint64(storedBucket.objectMetadata[k].Size) | ||||
| 		log.Printf("Evicting: %s of Size: %d", k, storedBucket.objectMetadata[k].Size) | ||||
| 		log.Println("TotalSize:", memory.totalSize) | ||||
| 		delete(storedBucket.objectMetadata, k) | ||||
| 		// remove bucket if no objects found anymore | ||||
| 		if len(storedBucket.objectMetadata) == 0 { | ||||
| 			delete(memory.storedBuckets, bucket) | ||||
| 		} | ||||
| 	} | ||||
| 	// free memory for kernel to reclaim if possible | ||||
| 	debug.FreeOSMemory() | ||||
| } | ||||
| 
 | ||||
| func (memory *memoryDriver) expireObjects() { | ||||
| func (memory *memoryDriver) expireLRUObjects() { | ||||
| 	for { | ||||
| 		if memory.shutdown { | ||||
| 			return | ||||
| 		} | ||||
| 		var sleepDuration time.Duration | ||||
| 		memory.lock.Lock() | ||||
| 		if len(memory.objectMetadata) > 0 { | ||||
| 		if memory.objects.Len() > 0 { | ||||
| 			if k, _, ok := memory.objects.GetOldest(); ok { | ||||
| 				key := k.(string) | ||||
| 				object := memory.objectMetadata[key] | ||||
| 				if time.Now().Sub(object.lastAccessed) > memory.expiration { | ||||
| 				if time.Now().Sub(memory.lastAccessedObjects[key]) > memory.expiration { | ||||
| 					memory.objects.RemoveOldest() | ||||
| 				} else { | ||||
| 					sleepDuration = memory.expiration - time.Now().Sub(object.lastAccessed) | ||||
| 					sleepDuration = memory.expiration - time.Now().Sub(memory.lastAccessedObjects[key]) | ||||
| 				} | ||||
| 			} | ||||
| 		} else { | ||||
| @ -508,8 +515,5 @@ func (memory *memoryDriver) expireObjects() { | ||||
| func (memory *memoryDriver) updateAccessTime(key string) { | ||||
| 	memory.lock.Lock() | ||||
| 	defer memory.lock.Unlock() | ||||
| 	if object, ok := memory.objectMetadata[key]; ok { | ||||
| 		object.lastAccessed = time.Now() | ||||
| 		memory.objectMetadata[key] = object | ||||
| 	} | ||||
| 	memory.lastAccessedObjects[key] = time.Now() | ||||
| } | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user