Purge stale object cache entry (#2770)

This commit is contained in:
Krishnan Parthasarathi 2016-09-23 19:55:28 -07:00 committed by Harshavardhana
parent 27e474b3d2
commit 669783f875
3 changed files with 38 additions and 6 deletions

View File

@ -121,7 +121,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
if xlMeta.Stat.Size > 0 && xl.objCacheEnabled { if xlMeta.Stat.Size > 0 && xl.objCacheEnabled {
// Validate if we have previous cache. // Validate if we have previous cache.
var cachedBuffer io.ReadSeeker var cachedBuffer io.ReadSeeker
cachedBuffer, err = xl.objCache.Open(path.Join(bucket, object)) cachedBuffer, err = xl.objCache.Open(path.Join(bucket, object), modTime)
if err == nil { // Cache hit. if err == nil { // Cache hit.
// Advance the buffer to offset as if it was read. // Advance the buffer to offset as if it was read.
if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset. if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset.

View File

@ -163,7 +163,9 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) {
// Open - open the in-memory file, returns an in memory read seeker. // Open - open the in-memory file, returns an in memory read seeker.
// returns an error ErrNotFoundInCache, if the key does not exist. // returns an error ErrNotFoundInCache, if the key does not exist.
func (c *Cache) Open(key string) (io.ReadSeeker, error) { // Returns ErrKeyNotFoundInCache if entry's lastAccessedTime is older
// than objModTime.
func (c *Cache) Open(key string, objModTime time.Time) (io.ReadSeeker, error) {
// Entry exists, return the readable buffer. // Entry exists, return the readable buffer.
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
@ -171,6 +173,11 @@ func (c *Cache) Open(key string) (io.ReadSeeker, error) {
if !ok { if !ok {
return nil, ErrKeyNotFoundInCache return nil, ErrKeyNotFoundInCache
} }
// Check if buf is recent copy of the object on disk.
if buf.lastAccessed.Before(objModTime) {
c.delete(key)
return nil, ErrKeyNotFoundInCache
}
buf.lastAccessed = time.Now().UTC() buf.lastAccessed = time.Now().UTC()
return bytes.NewReader(buf.value), nil return bytes.NewReader(buf.value), nil
} }

View File

@ -57,7 +57,9 @@ func TestObjExpiry(t *testing.T) {
} }
// Wait for 500 millisecond. // Wait for 500 millisecond.
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
_, err = cache.Open("test") // Setting objModTime to the beginning of golang's time.Time to avoid deletion of stale entry.
fakeObjModTime := time.Time{}
_, err = cache.Open("test", fakeObjModTime)
if err != testCase.err { if err != testCase.err {
t.Errorf("Test case 1 expected %s, got instead %s", testCase.err, err) t.Errorf("Test case 1 expected %s, got instead %s", testCase.err, err)
} }
@ -65,6 +67,9 @@ func TestObjExpiry(t *testing.T) {
// TestObjCache - tests various cases for object cache behavior. // TestObjCache - tests various cases for object cache behavior.
func TestObjCache(t *testing.T) { func TestObjCache(t *testing.T) {
// Setting objModTime to the beginning of golang's time.Time to avoid deletion of stale entry.
fakeObjModTime := time.Time{}
// Non exhaustive list of all object cache behavior cases. // Non exhaustive list of all object cache behavior cases.
testCases := []struct { testCases := []struct {
expiry time.Duration expiry time.Duration
@ -117,7 +122,7 @@ func TestObjCache(t *testing.T) {
// Test 1 validating Open failure. // Test 1 validating Open failure.
testCase := testCases[0] testCase := testCases[0]
cache := New(testCase.cacheSize, testCase.expiry) cache := New(testCase.cacheSize, testCase.expiry)
_, err := cache.Open("test") _, err := cache.Open("test", fakeObjModTime)
if testCase.err != err { if testCase.err != err {
t.Errorf("Test case 2 expected to pass, failed instead %s", err) t.Errorf("Test case 2 expected to pass, failed instead %s", err)
} }
@ -157,7 +162,7 @@ func TestObjCache(t *testing.T) {
if err = w.Close(); err != nil { if err = w.Close(); err != nil {
t.Errorf("Test case 4 expected to pass, failed instead %s", err) t.Errorf("Test case 4 expected to pass, failed instead %s", err)
} }
r, err := cache.Open("test") r, err := cache.Open("test", fakeObjModTime)
if err != nil { if err != nil {
t.Errorf("Test case 4 expected to pass, failed instead %s", err) t.Errorf("Test case 4 expected to pass, failed instead %s", err)
} }
@ -186,7 +191,7 @@ func TestObjCache(t *testing.T) {
} }
// Delete the cache entry. // Delete the cache entry.
cache.Delete("test") cache.Delete("test")
_, err = cache.Open("test") _, err = cache.Open("test", fakeObjModTime)
if testCase.err != err { if testCase.err != err {
t.Errorf("Test case 5 expected to pass, failed instead %s", err) t.Errorf("Test case 5 expected to pass, failed instead %s", err)
} }
@ -237,3 +242,23 @@ func TestObjCache(t *testing.T) {
t.Errorf("Test case 7 expected to fail, passed instead") t.Errorf("Test case 7 expected to fail, passed instead")
} }
} }
// TestStateEntryPurge - tests if objCache purges stale entry and returns ErrKeyNotFoundInCache.
func TestStaleEntryPurge(t *testing.T) {
cache := New(1024, NoExpiry)
w, err := cache.Create("test", 5)
if err != nil {
t.Errorf("Test case expected to pass, failed instead %s", err)
}
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case expected to pass, failed instead %s", err)
}
_, err = cache.Open("test", time.Now().AddDate(0, 0, 1).UTC())
if err != ErrKeyNotFoundInCache {
t.Errorf("Test case expected to return ErrKeyNotFoundInCache, instead returned %s", err)
}
}