Merge pull request #596 from harshavardhana/pr_out_avoid_one_last_memory_copy_wip

This commit is contained in:
Harshavardhana 2015-05-14 11:29:59 -07:00
commit 4f7ae8af92
3 changed files with 38 additions and 42 deletions

View File

@ -63,7 +63,7 @@ func HTTPHandler(driver drivers.Driver) http.Handler {
// h = quota.BandwidthCap(h, 100*1024*1024, time.Duration(24*time.Hour)) // h = quota.BandwidthCap(h, 100*1024*1024, time.Duration(24*time.Hour))
// h = quota.RequestLimit(h, 100, time.Duration(30*time.Minute)) // h = quota.RequestLimit(h, 100, time.Duration(30*time.Minute))
// h = quota.RequestLimit(h, 1000, time.Duration(24*time.Hour)) // h = quota.RequestLimit(h, 1000, time.Duration(24*time.Hour))
h = quota.ConnectionLimit(h, 5) h = quota.ConnectionLimit(h, 4)
h = logging.LogHandler(h) h = logging.LogHandler(h)
return h return h
} }

View File

@ -43,7 +43,7 @@ import (
type memoryDriver struct { type memoryDriver struct {
storedBuckets map[string]storedBucket storedBuckets map[string]storedBucket
lock *sync.RWMutex lock *sync.RWMutex
objects *Intelligent objects *Cache
} }
type storedBucket struct { type storedBucket struct {
@ -69,7 +69,7 @@ func Start(maxSize uint64, expiration time.Duration) (chan<- string, <-chan erro
var memory *memoryDriver var memory *memoryDriver
memory = new(memoryDriver) memory = new(memoryDriver)
memory.storedBuckets = make(map[string]storedBucket) memory.storedBuckets = make(map[string]storedBucket)
memory.objects = NewIntelligent(maxSize, expiration) memory.objects = NewCache(maxSize, expiration)
memory.lock = new(sync.RWMutex) memory.lock = new(sync.RWMutex)
memory.objects.OnExpired = memory.expiredObject memory.objects.OnExpired = memory.expiredObject
@ -207,30 +207,6 @@ func (memory *memoryDriver) CreateObject(bucket, key, contentType, expectedMD5Su
return md5sum, iodine.New(err, nil) return md5sum, iodine.New(err, nil)
} }
// getMD5AndData - this is written as a wrapper to capture md5sum and data in a more memory efficient way
func getMD5AndData(reader io.Reader) ([]byte, []byte, error) {
hash := md5.New()
var data []byte
var err error
var length int
for err == nil {
byteBuffer := make([]byte, 1024*1024)
length, err = reader.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
// It is necessary for us to verify this and break
if length == 0 {
break
}
hash.Write(byteBuffer[0:length])
data = append(data, byteBuffer[0:length]...)
}
if err != io.EOF {
return nil, nil, err
}
return hash.Sum(nil), data, nil
}
// createObject - PUT object to memory buffer // createObject - PUT object to memory buffer
func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) { func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Sum string, size int64, data io.Reader) (string, error) {
memory.lock.RLock() memory.lock.RLock()
@ -267,15 +243,35 @@ func (memory *memoryDriver) createObject(bucket, key, contentType, expectedMD5Su
} }
expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes) expectedMD5Sum = hex.EncodeToString(expectedMD5SumBytes)
} }
md5SumBytes, readBytes, err := getMD5AndData(data)
if err != nil { // calculate md5
hash := md5.New()
var readBytes []byte
var err error
var length int
for err == nil {
byteBuffer := make([]byte, 1024*1024)
length, err = data.Read(byteBuffer)
// While hash.Write() wouldn't mind a Nil byteBuffer
// It is necessary for us to verify this and break
if length == 0 {
break
}
hash.Write(byteBuffer[0:length])
readBytes = append(readBytes, byteBuffer[0:length]...)
}
if err != io.EOF {
return "", iodine.New(err, nil) return "", iodine.New(err, nil)
} }
go debug.FreeOSMemory()
md5SumBytes := hash.Sum(nil)
totalLength := len(readBytes) totalLength := len(readBytes)
memory.lock.Lock() memory.lock.Lock()
memory.objects.Set(objectKey, readBytes) memory.objects.Set(objectKey, readBytes)
memory.lock.Unlock() memory.lock.Unlock()
// de-allocating // setting up for de-allocation
readBytes = nil readBytes = nil
md5Sum := hex.EncodeToString(md5SumBytes) md5Sum := hex.EncodeToString(md5SumBytes)

View File

@ -23,9 +23,9 @@ import (
var zeroExpiration = time.Duration(0) var zeroExpiration = time.Duration(0)
// Intelligent holds the required variables to compose an in memory cache system // Cache holds the required variables to compose an in memory cache system
// which also provides expiring key mechanism and also maxSize // which also provides expiring key mechanism and also maxSize
type Intelligent struct { type Cache struct {
// Mutex is used for handling the concurrent // Mutex is used for handling the concurrent
// read/write requests for cache // read/write requests for cache
sync.Mutex sync.Mutex
@ -62,12 +62,12 @@ type Stats struct {
Expired uint64 Expired uint64
} }
// NewIntelligent creates an inmemory cache // NewCache creates an inmemory cache
// //
// maxSize is used for expiring objects before we run out of memory // maxSize is used for expiring objects before we run out of memory
// expiration is used for expiration of a key from cache // expiration is used for expiration of a key from cache
func NewIntelligent(maxSize uint64, expiration time.Duration) *Intelligent { func NewCache(maxSize uint64, expiration time.Duration) *Cache {
return &Intelligent{ return &Cache{
items: map[string]interface{}{}, items: map[string]interface{}{},
updatedAt: map[string]time.Time{}, updatedAt: map[string]time.Time{},
expiration: expiration, expiration: expiration,
@ -76,7 +76,7 @@ func NewIntelligent(maxSize uint64, expiration time.Duration) *Intelligent {
} }
// Stats get current cache statistics // Stats get current cache statistics
func (r *Intelligent) Stats() Stats { func (r *Cache) Stats() Stats {
return Stats{ return Stats{
Bytes: r.currentSize, Bytes: r.currentSize,
Items: uint64(len(r.items)), Items: uint64(len(r.items)),
@ -85,7 +85,7 @@ func (r *Intelligent) Stats() Stats {
} }
// ExpireObjects expire objects in go routine // ExpireObjects expire objects in go routine
func (r *Intelligent) ExpireObjects(gcInterval time.Duration) { func (r *Cache) ExpireObjects(gcInterval time.Duration) {
r.stopExpireTimer = make(chan struct{}) r.stopExpireTimer = make(chan struct{})
ticker := time.NewTicker(gcInterval) ticker := time.NewTicker(gcInterval)
go func() { go func() {
@ -103,7 +103,7 @@ func (r *Intelligent) ExpireObjects(gcInterval time.Duration) {
} }
// Get returns a value of a given key if it exists // Get returns a value of a given key if it exists
func (r *Intelligent) Get(key string) (interface{}, bool) { func (r *Cache) Get(key string) (interface{}, bool) {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
value, ok := r.items[key] value, ok := r.items[key]
@ -115,7 +115,7 @@ func (r *Intelligent) Get(key string) (interface{}, bool) {
} }
// Set will persist a value to the cache // Set will persist a value to the cache
func (r *Intelligent) Set(key string, value interface{}) { func (r *Cache) Set(key string, value interface{}) {
r.Lock() r.Lock()
// remove random key if only we reach the maxSize threshold, // remove random key if only we reach the maxSize threshold,
// if not assume infinite memory // if not assume infinite memory
@ -135,7 +135,7 @@ func (r *Intelligent) Set(key string, value interface{}) {
} }
// Expire expires keys which have expired // Expire expires keys which have expired
func (r *Intelligent) Expire() { func (r *Cache) Expire() {
r.Lock() r.Lock()
defer r.Unlock() defer r.Unlock()
for key := range r.items { for key := range r.items {
@ -146,7 +146,7 @@ func (r *Intelligent) Expire() {
} }
// Delete deletes a given key if exists // Delete deletes a given key if exists
func (r *Intelligent) Delete(key string) { func (r *Cache) Delete(key string) {
if _, ok := r.items[key]; ok { if _, ok := r.items[key]; ok {
r.currentSize -= uint64(len(r.items[key].([]byte))) r.currentSize -= uint64(len(r.items[key].([]byte)))
delete(r.items, key) delete(r.items, key)
@ -158,7 +158,7 @@ func (r *Intelligent) Delete(key string) {
} }
} }
func (r *Intelligent) isValid(key string) bool { func (r *Cache) isValid(key string) bool {
updatedAt, ok := r.updatedAt[key] updatedAt, ok := r.updatedAt[key]
if !ok { if !ok {
return false return false