XL/PutObject: Add single putObject and multipart caching. (#2115)

- Additionally adds test cases as well for object cache.
- Adds auto-expiry with expiration and cleanup time interval.

Fixes #2080
Fixes #2091
This commit is contained in:
Harshavardhana 2016-07-08 20:34:27 -07:00 committed by Anand Babu (AB) Periasamy
parent b0c180b77c
commit c0c8a8430e
10 changed files with 482 additions and 81 deletions

39
docs/caching.md Normal file
View File

@ -0,0 +1,39 @@
## Object caching
Object caching by turned on by default with following settings
- Default cache size 8GB, can be changed from environment variable
``MINIO_CACHE_SIZE`` supports both SI and ISO IEC standard forms
for input size parameters.
- Default expiration of entries is 72 hours, can be changed from
environment variable ``MINIO_CACHE_EXPIRY`` supportings Go
``time.Duration`` with valid units "ns", "us" (or "µs"),
"ms", "s", "m", "h".
- Default expiry interval is 1/4th of the expiration hours, so
expiration sweep happens across the cache every 1/4th the time
duration of the set entry expiration duration.
### Tricks
Setting MINIO_CACHE_SIZE=0 will turn off caching entirely.
Setting MINIO_CACHE_EXPIRY=0s will turn off cache garbage collections,
all cached objects will never expire.
### Behavior
Caching happens for both GET and PUT.
- GET caches new objects for entries not found in cache,
otherwise serves from the cache.
- PUT/POST caches all successfully uploaded objects.
NOTE: Cache is not populated if there are any errors
while reading from the disk.
Expiration happens automatically based on the configured
interval as explained above, frequently accessed objects
stay alive for significantly longer time due to the fact
that expiration time is reset for every cache hit.

View File

@ -16,7 +16,10 @@
package main package main
import "github.com/fatih/color" import (
"github.com/fatih/color"
"github.com/minio/minio/pkg/objcache"
)
// Global constants for Minio. // Global constants for Minio.
const ( const (
@ -45,6 +48,8 @@ var (
globalMaxConn = 0 globalMaxConn = 0
// Maximum cache size. // Maximum cache size.
globalMaxCacheSize = uint64(maxCacheSize) globalMaxCacheSize = uint64(maxCacheSize)
// Cache expiry.
globalCacheExpiry = objcache.DefaultExpiry
// Add new variable global values here. // Add new variable global values here.
) )

View File

@ -29,7 +29,7 @@ type nsParam struct {
// nsLock - provides primitives for locking critical namespace regions. // nsLock - provides primitives for locking critical namespace regions.
type nsLock struct { type nsLock struct {
*sync.RWMutex sync.RWMutex
ref uint ref uint
} }
@ -37,7 +37,7 @@ type nsLock struct {
// Unlock, RLock and RUnlock. // Unlock, RLock and RUnlock.
type nsLockMap struct { type nsLockMap struct {
lockMap map[nsParam]*nsLock lockMap map[nsParam]*nsLock
mutex *sync.Mutex mutex sync.Mutex
} }
// Global name space lock. // Global name space lock.
@ -47,7 +47,6 @@ var nsMutex *nsLockMap
func initNSLock() { func initNSLock() {
nsMutex = &nsLockMap{ nsMutex = &nsLockMap{
lockMap: make(map[nsParam]*nsLock), lockMap: make(map[nsParam]*nsLock),
mutex: &sync.Mutex{},
} }
} }
@ -59,7 +58,6 @@ func (n *nsLockMap) lock(volume, path string, readLock bool) {
nsLk, found := n.lockMap[param] nsLk, found := n.lockMap[param]
if !found { if !found {
nsLk = &nsLock{ nsLk = &nsLock{
RWMutex: &sync.RWMutex{},
ref: 0, ref: 0,
} }
n.lockMap[param] = nsLk n.lockMap[param] = nsLk

62
pkg/objcache/README.md Normal file
View File

@ -0,0 +1,62 @@
```
PACKAGE DOCUMENTATION
package objcache
import "github.com/minio/minio/pkg/objcache"
Package objcache implements in memory caching methods.
VARIABLES
var DefaultExpiry = time.Duration(72 * time.Hour) // 72hrs.
DefaultExpiry represents default time duration value when individual
entries will be expired.
var ErrCacheFull = errors.New("Not enough space in cache")
ErrCacheFull - cache is full.
var ErrKeyNotFoundInCache = errors.New("Key not found in cache")
ErrKeyNotFoundInCache - key not found in cache.
var NoExpiry = time.Duration(0)
NoExpiry represents caches to be permanent and can only be deleted.
TYPES
type Cache struct {
// OnEviction - callback function for eviction
OnEviction func(key string)
// contains filtered or unexported fields
}
Cache holds the required variables to compose an in memory cache system
which also provides expiring key mechanism and also maxSize.
func New(maxSize uint64, expiry time.Duration) *Cache
New - Return a new cache with a given default expiry duration. If the
expiry duration is less than one (or NoExpiry), the items in the cache
never expire (by default), and must be deleted manually.
func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error)
Create - validates if object size fits with in cache size limit and
returns a io.WriteCloser to which object contents can be written and
finally Close()'d. During Close() we checks if the amount of data
written is equal to the size of the object, in which case it saves the
contents to object cache.
func (c *Cache) Delete(key string)
Delete - delete deletes an entry from the cache.
func (c *Cache) DeleteExpired()
DeleteExpired - deletes all the expired entries from the cache.
func (c *Cache) Open(key string) (io.ReadSeeker, error)
Open - open the in-memory file, returns an in memory read seeker.
returns an error ErrNotFoundInCache, if the key does not exist.
func (c *Cache) StopExpiry()
StopExpiry sends a message to the expiry routine to stop expiring cached
entries. NOTE: once this is called, cached entries will not be expired
if the consume has called this.
```

View File

@ -26,15 +26,25 @@ import (
"time" "time"
) )
// NoExpiration represents caches to be permanent and can only be deleted. // NoExpiry represents caches to be permanent and can only be deleted.
var NoExpiration = time.Duration(0) var NoExpiry = time.Duration(0)
// DefaultExpiry represents default time duration value when individual entries will be expired.
var DefaultExpiry = time.Duration(72 * time.Hour) // 72hrs.
// buffer represents the in memory cache of a single entry.
// buffer carries value of the data and last accessed time.
type buffer struct {
value []byte // Value of the entry.
lastAccessed time.Time // Represents time when value was last accessed.
}
// Cache holds the required variables to compose an in memory cache system // Cache holds the required variables to compose an in memory cache system
// which also provides expiring key mechanism and also maxSize. // which also provides expiring key mechanism and also maxSize.
type Cache struct { type Cache struct {
// Mutex is used for handling the concurrent // Mutex is used for handling the concurrent
// read/write requests for cache // read/write requests for cache
mutex *sync.RWMutex mutex sync.Mutex
// maxSize is a total size for overall cache // maxSize is a total size for overall cache
maxSize uint64 maxSize uint64
@ -43,29 +53,39 @@ type Cache struct {
currentSize uint64 currentSize uint64
// OnEviction - callback function for eviction // OnEviction - callback function for eviction
OnEviction func(a ...interface{}) OnEviction func(key string)
// totalEvicted counter to keep track of total expirations // totalEvicted counter to keep track of total expirys
totalEvicted int totalEvicted int
// map of objectName and its contents // map of objectName and its contents
entries map[string][]byte entries map[string]*buffer
// Expiration in time duration. // Expiry in time duration.
expiry time.Duration expiry time.Duration
// Stop garbage collection routine, stops any running GC routine.
stopGC chan struct{}
} }
// New creates an inmemory cache // New - Return a new cache with a given default expiry duration.
// // If the expiry duration is less than one (or NoExpiry),
// maxSize is used for expiring objects before we run out of memory // the items in the cache never expire (by default), and must be deleted
// expiration is used for expiration of a key from cache // manually.
func New(maxSize uint64, expiry time.Duration) *Cache { func New(maxSize uint64, expiry time.Duration) *Cache {
return &Cache{ C := &Cache{
mutex: &sync.RWMutex{},
maxSize: maxSize, maxSize: maxSize,
entries: make(map[string][]byte), entries: make(map[string]*buffer),
expiry: expiry, expiry: expiry,
} }
// We have expiry start the janitor routine.
if expiry > 0 {
C.stopGC = make(chan struct{})
// Start garbage collection routine to expire objects.
C.startGC()
}
return C
} }
// ErrKeyNotFoundInCache - key not found in cache. // ErrKeyNotFoundInCache - key not found in cache.
@ -74,17 +94,19 @@ var ErrKeyNotFoundInCache = errors.New("Key not found in cache")
// ErrCacheFull - cache is full. // ErrCacheFull - cache is full.
var ErrCacheFull = errors.New("Not enough space in cache") var ErrCacheFull = errors.New("Not enough space in cache")
// ErrExcessData - excess data was attempted to be written on cache.
var ErrExcessData = errors.New("Attempted excess write on cache")
// Used for adding entry to the object cache. Implements io.WriteCloser // Used for adding entry to the object cache. Implements io.WriteCloser
type cacheBuffer struct { type cacheBuffer struct {
*bytes.Buffer // Implements io.Writer *bytes.Buffer // Implements io.Writer
onClose func() onClose func() error
} }
// On close, onClose() is called which checks if all object contents // On close, onClose() is called which checks if all object contents
// have been written so that it can save the buffer to the cache. // have been written so that it can save the buffer to the cache.
func (c cacheBuffer) Close() error { func (c cacheBuffer) Close() (err error) {
c.onClose() return c.onClose()
return nil
} }
// Create - validates if object size fits with in cache size limit and returns a io.WriteCloser // Create - validates if object size fits with in cache size limit and returns a io.WriteCloser
@ -92,39 +114,43 @@ func (c cacheBuffer) Close() error {
// checks if the amount of data written is equal to the size of the object, in which // checks if the amount of data written is equal to the size of the object, in which
// case it saves the contents to object cache. // case it saves the contents to object cache.
func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) { func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) {
c.mutex.Lock() // Recovers any panic generated and return errors appropriately.
defer c.mutex.Unlock() defer func() {
if r := recover(); r != nil {
// Recover any panic and return ErrCacheFull.
err = ErrCacheFull
}
}() // Do not crash the server.
valueLen := uint64(size) valueLen := uint64(size)
if c.maxSize > 0 {
// Check if the size of the object is not bigger than the capacity of the cache. // Check if the size of the object is not bigger than the capacity of the cache.
if valueLen > c.maxSize { if c.maxSize > 0 && valueLen > c.maxSize {
return nil, ErrCacheFull return nil, ErrCacheFull
} }
// TODO - auto expire random key.
if c.currentSize+valueLen > c.maxSize {
return nil, ErrCacheFull
}
}
// Will hold the object contents. // Will hold the object contents.
buf := bytes.NewBuffer(make([]byte, 0, size)) buf := bytes.NewBuffer(make([]byte, 0, size))
// Account for the memory allocated above.
c.currentSize += uint64(size)
// Function called on close which saves the object contents // Function called on close which saves the object contents
// to the object cache. // to the object cache.
onClose := func() { onClose := func() error {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
if buf.Len() != int(size) { if size != int64(buf.Len()) {
// Full object not available hence do not save buf to object cache. // Full object not available hence do not save buf to object cache.
c.currentSize -= uint64(size) return io.ErrShortBuffer
return }
if c.maxSize > 0 && c.currentSize+valueLen > c.maxSize {
return ErrExcessData
} }
// Full object available in buf, save it to cache. // Full object available in buf, save it to cache.
c.entries[key] = buf.Bytes() c.entries[key] = &buffer{
return value: buf.Bytes(),
lastAccessed: time.Now().UTC(), // Save last accessed time.
}
// Account for the memory allocated above.
c.currentSize += uint64(size)
return nil
} }
// Object contents that is written - cacheBuffer.Write(data) // Object contents that is written - cacheBuffer.Write(data)
@ -138,35 +164,77 @@ func (c *Cache) Create(key string, size int64) (w io.WriteCloser, err error) {
// Open - open the in-memory file, returns an in memory read seeker. // Open - open the in-memory file, returns an in memory read seeker.
// returns an error ErrNotFoundInCache, if the key does not exist. // returns an error ErrNotFoundInCache, if the key does not exist.
func (c *Cache) Open(key string) (io.ReadSeeker, error) { func (c *Cache) Open(key string) (io.ReadSeeker, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()
// Entry exists, return the readable buffer. // Entry exists, return the readable buffer.
buffer, ok := c.entries[key] c.mutex.Lock()
defer c.mutex.Unlock()
buf, ok := c.entries[key]
if !ok { if !ok {
return nil, ErrKeyNotFoundInCache return nil, ErrKeyNotFoundInCache
} }
return bytes.NewReader(buffer), nil buf.lastAccessed = time.Now().UTC()
return bytes.NewReader(buf.value), nil
} }
// Delete - delete deletes an entry from in-memory fs. // Delete - delete deletes an entry from the cache.
func (c *Cache) Delete(key string) { func (c *Cache) Delete(key string) {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() c.delete(key)
c.mutex.Unlock()
// Delete an entry.
buffer, ok := c.entries[key]
if ok {
c.deleteEntry(key, int64(len(buffer)))
}
}
// Deletes the entry that was found.
func (c *Cache) deleteEntry(key string, size int64) {
delete(c.entries, key)
c.currentSize -= uint64(size)
c.totalEvicted++
if c.OnEviction != nil { if c.OnEviction != nil {
c.OnEviction(key) c.OnEviction(key)
} }
} }
// gc - garbage collect all the expired entries from the cache.
func (c *Cache) gc() {
var evictedEntries []string
c.mutex.Lock()
for k, v := range c.entries {
if c.expiry > 0 && time.Now().UTC().Sub(v.lastAccessed) > c.expiry {
c.delete(k)
evictedEntries = append(evictedEntries, k)
}
}
c.mutex.Unlock()
for _, k := range evictedEntries {
if c.OnEviction != nil {
c.OnEviction(k)
}
}
}
// StopGC sends a message to the expiry routine to stop
// expiring cached entries. NOTE: once this is called, cached
// entries will not be expired if the consumer has called this.
func (c *Cache) StopGC() {
if c.stopGC != nil {
c.stopGC <- struct{}{}
}
}
// startGC starts running a routine ticking at expiry interval, on each interval
// this routine does a sweep across the cache entries and garbage collects all the
// expired entries.
func (c *Cache) startGC() {
go func() {
for {
select {
// Wait till cleanup interval and initiate delete expired entries.
case <-time.After(c.expiry / 4):
c.gc()
// Stop the routine, usually called by the user of object cache during cleanup.
case <-c.stopGC:
return
}
}
}()
}
// Deletes a requested entry from the cache.
func (c *Cache) delete(key string) {
if buf, ok := c.entries[key]; ok {
delete(c.entries, key)
c.currentSize -= uint64(len(buf.value))
c.totalEvicted++
}
}

View File

@ -0,0 +1,172 @@
/*
* Minio Cloud Storage, (C) 2016 Minio, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package objcache
import (
"bytes"
"io"
"io/ioutil"
"testing"
"time"
)
// TestObjCache - tests various cases for object cache behavior.
func TestObjCache(t *testing.T) {
// Non exhaustive list of all object cache behavior cases.
testCases := []struct {
expiry time.Duration
cacheSize uint64
err error
closeErr error
}{
// Validate if a key is not found in cache and Open fails.
{
expiry: NoExpiry,
cacheSize: 1024,
err: ErrKeyNotFoundInCache,
},
// Validate if cache indicates that it is full and Create fails.
{
expiry: NoExpiry,
cacheSize: 1,
err: ErrCacheFull,
},
// Validate if Create succeeds but Close fails to write to buffer.
{
expiry: NoExpiry,
cacheSize: 2,
closeErr: io.ErrShortBuffer,
},
// Validate that Create and Close succeed, making sure to update the cache.
{
expiry: NoExpiry,
cacheSize: 1024,
},
// Validate that Delete succeeds and Open fails with key not found in cache.
{
expiry: NoExpiry,
cacheSize: 1024,
err: ErrKeyNotFoundInCache,
},
// Validate OnEviction function is called upon entry delete.
{
expiry: NoExpiry,
cacheSize: 1024,
},
}
// Test 1 validating Open failure.
testCase := testCases[0]
cache := New(testCase.cacheSize, testCase.expiry)
_, err := cache.Open("test")
if testCase.err != err {
t.Errorf("Test case 2 expected to pass, failed instead %s", err)
}
// Test 2 validating Create failure.
testCase = testCases[1]
cache = New(testCase.cacheSize, testCase.expiry)
_, err = cache.Create("test", 2)
if testCase.err != err {
t.Errorf("Test case 2 expected to pass, failed instead %s", err)
}
// Test 3 validating Create succeeds and returns a writer.
// Subsequently we Close() without writing any data, to receive
// `io.ErrShortBuffer`
testCase = testCases[2]
cache = New(testCase.cacheSize, testCase.expiry)
w, err := cache.Create("test", 1)
if testCase.err != err {
t.Errorf("Test case 3 expected to pass, failed instead %s", err)
}
if err = w.Close(); err != testCase.closeErr {
t.Errorf("Test case 3 expected to pass, failed instead %s", err)
}
// Test 4 validates Create and Close succeeds successfully caching
// the writes.
testCase = testCases[3]
cache = New(testCase.cacheSize, testCase.expiry)
w, err = cache.Create("test", 5)
if testCase.err != err {
t.Errorf("Test case 4 expected to pass, failed instead %s", err)
}
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case 4 expected to pass, failed instead %s", err)
}
r, err := cache.Open("test")
if err != nil {
t.Errorf("Test case 4 expected to pass, failed instead %s", err)
}
// Reads everything stored for key "test".
cbytes, err := ioutil.ReadAll(r)
if err != nil {
t.Errorf("Test case 4 expected to pass, failed instead %s", err)
}
// Validate if read bytes match.
if !bytes.Equal(cbytes, []byte("Hello")) {
t.Errorf("Test case 4 expected to pass. wanted \"Hello\", got %s", string(cbytes))
}
// Test 5 validates Delete succeeds and Open fails with err
testCase = testCases[4]
cache = New(testCase.cacheSize, testCase.expiry)
w, err = cache.Create("test", 5)
if err != nil {
t.Errorf("Test case 5 expected to pass, failed instead %s", err)
}
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case 5 expected to pass, failed instead %s", err)
}
// Delete the cache entry.
cache.Delete("test")
_, err = cache.Open("test")
if testCase.err != err {
t.Errorf("Test case 5 expected to pass, failed instead %s", err)
}
// Test 6 validates OnEviction being called upon Delete is being invoked.
testCase = testCases[5]
cache = New(testCase.cacheSize, testCase.expiry)
w, err = cache.Create("test", 5)
if err != nil {
t.Errorf("Test case 6 expected to pass, failed instead %s", err)
}
// Write '5' bytes.
w.Write([]byte("Hello"))
// Close to successfully save into cache.
if err = w.Close(); err != nil {
t.Errorf("Test case 6 expected to pass, failed instead %s", err)
}
var deleteKey string
cache.OnEviction = func(key string) {
deleteKey = key
}
// Delete the cache entry.
cache.Delete("test")
if deleteKey != "test" {
t.Errorf("Test case 6 expected to pass, wanted \"test\", got %s", deleteKey)
}
}

View File

@ -149,6 +149,14 @@ func initServerConfig(c *cli.Context) {
fatalIf(err, "Unable to convert MINIO_CACHE_SIZE=%s environment variable into its integer value.", maxCacheSizeStr) fatalIf(err, "Unable to convert MINIO_CACHE_SIZE=%s environment variable into its integer value.", maxCacheSizeStr)
} }
// Fetch cache expiry from environment variable.
if cacheExpiryStr := os.Getenv("MINIO_CACHE_EXPIRY"); cacheExpiryStr != "" {
// We need to parse cache expiry to its time.Duration value.
var err error
globalCacheExpiry, err = time.ParseDuration(cacheExpiryStr)
fatalIf(err, "Unable to convert MINIO_CACHE_EXPIRY=%s environment variable into its time.Duration value.", cacheExpiryStr)
}
// Fetch access keys from environment variables if any and update the config. // Fetch access keys from environment variables if any and update the config.
accessKey := os.Getenv("MINIO_ACCESS_KEY") accessKey := os.Getenv("MINIO_ACCESS_KEY")
secretKey := os.Getenv("MINIO_SECRET_KEY") secretKey := os.Getenv("MINIO_SECRET_KEY")

View File

@ -21,6 +21,7 @@ import (
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"path" "path"
"path/filepath" "path/filepath"
"strings" "strings"
@ -683,7 +684,19 @@ func (xl xlObjects) CompleteMultipartUpload(bucket string, object string, upload
} }
// Hold write lock on the destination before rename. // Hold write lock on the destination before rename.
nsMutex.Lock(bucket, object) nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object) defer func() {
// A new complete multipart upload invalidates any
// previously cached object in memory.
xl.objCache.Delete(path.Join(bucket, object))
// This lock also protects the cache namespace.
nsMutex.Unlock(bucket, object)
// Prefetch the object from disk by triggerring a fake GetObject call
// Unlike a regular single PutObject, multipart PutObject is comes in
// stages and it is harder to cache.
go xl.GetObject(bucket, object, 0, objectSize, ioutil.Discard)
}()
// Rename if an object already exists to temporary location. // Rename if an object already exists to temporary location.
uniqueID := getUUID() uniqueID := getUUID()

View File

@ -114,7 +114,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
mw := writer mw := writer
// Object cache enabled block. // Object cache enabled block.
if xl.objCacheEnabled { if xlMeta.Stat.Size > 0 && xl.objCacheEnabled {
// Validate if we have previous cache. // Validate if we have previous cache.
cachedBuffer, err := xl.objCache.Open(path.Join(bucket, object)) cachedBuffer, err := xl.objCache.Open(path.Join(bucket, object))
if err == nil { // Cache hit. if err == nil { // Cache hit.
@ -144,9 +144,10 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
mw = io.MultiWriter(newBuffer, writer) mw = io.MultiWriter(newBuffer, writer)
defer newBuffer.Close() defer newBuffer.Close()
} }
// Ignore error if cache is full, proceed to write the object.
if err != nil && err != objcache.ErrCacheFull { if err != nil && err != objcache.ErrCacheFull {
// Perhaps cache is full, returns here. // For any other error return here.
return err return toObjectErr(err, bucket, object)
} }
} }
} }
@ -170,7 +171,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i
// Start reading the part name. // Start reading the part name.
n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize)
if err != nil { if err != nil {
return err return toObjectErr(err, bucket, object)
} }
totalBytesRead += n totalBytesRead += n
@ -340,7 +341,7 @@ func (xl xlObjects) renameObject(srcBucket, srcObject, dstBucket, dstObject stri
// until EOF, erasure codes the data across all disk and additionally // until EOF, erasure codes the data across all disk and additionally
// writes `xl.json` which carries the necessary metadata for future // writes `xl.json` which carries the necessary metadata for future
// object operations. // object operations.
func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (string, error) { func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.Reader, metadata map[string]string) (md5Sum string, err error) {
// Verify if bucket is valid. // Verify if bucket is valid.
if !IsValidBucketName(bucket) { if !IsValidBucketName(bucket) {
return "", BucketNameInvalid{Bucket: bucket} return "", BucketNameInvalid{Bucket: bucket}
@ -359,14 +360,15 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
if metadata == nil { if metadata == nil {
metadata = make(map[string]string) metadata = make(map[string]string)
} }
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
uniqueID := getUUID() uniqueID := getUUID()
tempErasureObj := path.Join(tmpMetaPrefix, uniqueID, "part.1") tempErasureObj := path.Join(tmpMetaPrefix, uniqueID, "part.1")
minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix) minioMetaTmpBucket := path.Join(minioMetaBucket, tmpMetaPrefix)
tempObj := uniqueID tempObj := uniqueID
// Lock the object.
nsMutex.Lock(bucket, object)
defer nsMutex.Unlock(bucket, object)
// Initialize xl meta. // Initialize xl meta.
xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks) xlMeta := newXLMetaV1(object, xl.dataBlocks, xl.parityBlocks)
@ -388,9 +390,33 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
higherVersion++ higherVersion++
} }
var mw io.Writer
// Initialize md5 writer. // Initialize md5 writer.
md5Writer := md5.New() md5Writer := md5.New()
// Proceed to set the cache.
var newBuffer io.WriteCloser
// If caching is enabled, proceed to set the cache.
if size > 0 && xl.objCacheEnabled {
// PutObject invalidates any previously cached object in memory.
xl.objCache.Delete(path.Join(bucket, object))
// Create a new entry in memory of size.
newBuffer, err = xl.objCache.Create(path.Join(bucket, object), size)
if err == nil {
// Create a multi writer to write to both memory and client response.
mw = io.MultiWriter(newBuffer, md5Writer)
}
// Ignore error if cache is full, proceed to write the object.
if err != nil && err != objcache.ErrCacheFull {
// For any other error return here.
return "", toObjectErr(err, bucket, object)
}
} else {
mw = md5Writer
}
// Limit the reader to its provided size if specified. // Limit the reader to its provided size if specified.
var limitDataReader io.Reader var limitDataReader io.Reader
if size > 0 { if size > 0 {
@ -403,7 +429,7 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
} }
// Tee reader combines incoming data stream and md5, data read from input stream is written to md5. // Tee reader combines incoming data stream and md5, data read from input stream is written to md5.
teeReader := io.TeeReader(limitDataReader, md5Writer) teeReader := io.TeeReader(limitDataReader, mw)
// Collect all the previous erasure infos across the disk. // Collect all the previous erasure infos across the disk.
var eInfos []erasureInfo var eInfos []erasureInfo
@ -505,6 +531,12 @@ func (xl xlObjects) PutObject(bucket string, object string, size int64, data io.
// Delete the temporary object. // Delete the temporary object.
xl.deleteObject(minioMetaTmpBucket, newUniqueID) xl.deleteObject(minioMetaTmpBucket, newUniqueID)
// Once we have successfully renamed the object, Close the buffer which would
// save the object on cache.
if size > 0 && xl.objCacheEnabled && newBuffer != nil {
newBuffer.Close()
}
// Return md5sum, successfully wrote object. // Return md5sum, successfully wrote object.
return newMD5Hex, nil return newMD5Hex, nil
} }

View File

@ -158,16 +158,20 @@ func newXLObjects(disks []string) (ObjectLayer, error) {
// Calculate data and parity blocks. // Calculate data and parity blocks.
dataBlocks, parityBlocks := len(newPosixDisks)/2, len(newPosixDisks)/2 dataBlocks, parityBlocks := len(newPosixDisks)/2, len(newPosixDisks)/2
// Initialize object cache.
objCache := objcache.New(globalMaxCacheSize, globalCacheExpiry)
// Initialize list pool.
listPool := newTreeWalkPool(globalLookupTimeout)
// Initialize xl objects. // Initialize xl objects.
xl := xlObjects{ xl := xlObjects{
physicalDisks: disks, physicalDisks: disks,
storageDisks: newPosixDisks, storageDisks: newPosixDisks,
dataBlocks: dataBlocks, dataBlocks: dataBlocks,
parityBlocks: parityBlocks, parityBlocks: parityBlocks,
// Inititalize list pool. listPool: listPool,
listPool: newTreeWalkPool(globalLookupTimeout), objCache: objCache,
// Initialize object caching, FIXME: support auto cache expiration.
objCache: objcache.New(globalMaxCacheSize, objcache.NoExpiration),
objCacheEnabled: globalMaxCacheSize > 0, objCacheEnabled: globalMaxCacheSize > 0,
} }