diff --git a/globals.go b/globals.go index 431568159..21182abf0 100644 --- a/globals.go +++ b/globals.go @@ -43,6 +43,8 @@ var ( // Maximum connections handled per // server, defaults to 0 (unlimited). globalMaxConn = 0 + // Maximum cache size. + globalMaxCacheSize = uint64(maxCacheSize) // Add new variable global values here. ) diff --git a/object-api_test.go b/object-api_test.go index e4293810a..cdb8cd142 100644 --- a/object-api_test.go +++ b/object-api_test.go @@ -16,9 +16,7 @@ package main -import ( - . "gopkg.in/check.v1" -) +import . "gopkg.in/check.v1" type ObjectLayerAPISuite struct{} @@ -28,6 +26,7 @@ var _ = Suite(&ObjectLayerAPISuite{}) func (s *ObjectLayerAPISuite) TestFSAPISuite(c *C) { // Initialize name space lock. initNSLock() + // function which creates a temp FS backend and executes the object layer suite test. execObjectLayerSuiteTestFS := func(objSuiteTest objSuiteTestType) { // create temp object layer backend. @@ -51,6 +50,7 @@ type objSuiteTestType func(c *C, obj ObjectLayer) func (s *ObjectLayerAPISuite) TestXLAPISuite(c *C) { // Initialize name space lock. initNSLock() + // function which creates a temp XL backend and executes the object layer suite test. execObjectLayerSuiteTestXL := func(objSuiteTest objSuiteTestType) { // create temp object layer backend. diff --git a/object-handlers.go b/object-handlers.go index 4560f2c92..4de55d4bf 100644 --- a/object-handlers.go +++ b/object-handlers.go @@ -115,6 +115,7 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req return } + // Caculate the http Range. var hrange *httpRange hrange, err = getRequestedRange(r.Header.Get("Range"), objInfo.Size) if err != nil { @@ -144,22 +145,23 @@ func (api objectAPIHandlers) GetObjectHandler(w http.ResponseWriter, r *http.Req if length == 0 { length = objInfo.Size - startOffset } + + // Reads the object at startOffset and writes to mw. if err := api.ObjectAPI.GetObject(bucket, object, startOffset, length, w); err != nil { - errorIf(err, "Writing to client failed.") + errorIf(err, "Unable to write to client.") // Do not send error response here, client would have already died. return } + // Success. } -var unixEpochTime = time.Unix(0, 0) - // checkLastModified implements If-Modified-Since and // If-Unmodified-Since checks. // // modtime is the modification time of the resource to be served, or // IsZero(). return value is whether this request is now complete. func checkLastModified(w http.ResponseWriter, r *http.Request, modtime time.Time) bool { - if modtime.IsZero() || modtime.Equal(unixEpochTime) { + if modtime.IsZero() || modtime.Equal(time.Unix(0, 0)) { // If the object doesn't have a modtime (IsZero), or the modtime // is obviously garbage (Unix time == 0), then ignore modtimes // and don't process the If-Modified-Since header. @@ -452,7 +454,7 @@ func (api objectAPIHandlers) CopyObjectHandler(w http.ResponseWriter, r *http.Re // modtime is the modification time of the resource to be served, or // IsZero(). return value is whether this request is now complete. func checkCopySourceLastModified(w http.ResponseWriter, r *http.Request, modtime time.Time) bool { - if modtime.IsZero() || modtime.Equal(unixEpochTime) { + if modtime.IsZero() || modtime.Equal(time.Unix(0, 0)) { // If the object doesn't have a modtime (IsZero), or the modtime // is obviously garbage (Unix time == 0), then ignore modtimes // and don't process the If-Modified-Since header. diff --git a/pkg/objcache/buffer.go b/pkg/objcache/buffer.go new file mode 100644 index 000000000..c99e49aa0 --- /dev/null +++ b/pkg/objcache/buffer.go @@ -0,0 +1,143 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package objcache + +import ( + "bytes" + "errors" + "io" + "time" +) + +// A Buffer is a variable-sized buffer of bytes with Read, Write and Seek methods. +// The zero value for Buffer is an empty buffer ready to use. +type Buffer struct { + buf []byte // contents are the bytes buf[off : len(buf)] + off int // read at &buf[off], write at &buf[len(buf)] + bootstrap [64]byte // memory to hold first slice; helps small buffers (Printf) avoid allocation. + accessTime time.Time // accessTime holds value of the last access time of this buffer. +} + +// NewBuffer creates and initializes a new Buffer using buf as its initial +// contents. It is intended to prepare a Buffer to read existing data. It +// can also be used to size the internal buffer for writing. To do that, +// buf should have the desired capacity but a length of zero. +// +// In most cases, new(Buffer) (or just declaring a Buffer variable) is +// sufficient to initialize a Buffer. +func NewBuffer(buf []byte) *Buffer { return &Buffer{buf: buf} } + +// Len returns the number of bytes of the unread portion of the buffer; +// b.Len() == len(b.Bytes()). +func (b *Buffer) Len() int { return len(b.buf) - b.off } + +// Size returns the original length of the underlying byte slice. +// Size is the number of bytes available for reading via ReadAt. +// The returned value is always the same and is not affected by calls +// to any other method. +func (b *Buffer) Size() int64 { return int64(len(b.buf)) } + +// makeSlice allocates a slice of size n. If the allocation fails, it panics +// with ErrTooLarge. +func makeSlice(n int) []byte { + // If the make fails, give a known error. + defer func() { + if recover() != nil { + panic(bytes.ErrTooLarge) + } + }() + return make([]byte, n) +} + +// grow grows the buffer to guarantee space for n more bytes. +// It returns the index where bytes should be written. +// If the buffer can't grow it will panic with ErrTooLarge. +func (b *Buffer) grow(n int) int { + m := b.Len() + // If buffer is empty, reset to recover space. + if m == 0 && b.off != 0 { + // Reuse buffer space. + b.buf = b.buf[0:0] + } + if len(b.buf)+n > cap(b.buf) { + var buf []byte + if b.buf == nil && n <= len(b.bootstrap) { + buf = b.bootstrap[0:] + } else if m+n <= cap(b.buf)/2 { + // We can slide things down instead of allocating a new + // slice. We only need m+n <= cap(b.buf) to slide, but + // we instead let capacity get twice as large so we + // don't spend all our time copying. + copy(b.buf[:], b.buf[b.off:]) + buf = b.buf[:m] + } else { + // not enough space anywhere + buf = makeSlice(2*cap(b.buf) + n) + copy(buf, b.buf[b.off:]) + } + b.buf = buf + b.off = 0 + } + b.buf = b.buf[0 : b.off+m+n] + return b.off + m +} + +// Write appends the contents of p to the buffer, growing the buffer as +// needed. The return value n is the length of p; err is always nil. If the +// buffer becomes too large, Write will panic with ErrTooLarge. +func (b *Buffer) Write(p []byte) (n int, err error) { + m := b.grow(len(p)) + return copy(b.buf[m:], p), nil +} + +// Read reads the next len(p) bytes from the buffer or until the buffer +// is drained. The return value n is the number of bytes read. If the +// buffer has no data to return, err is io.EOF (unless len(p) is zero); +// otherwise it is nil. +func (b *Buffer) Read(p []byte) (n int, err error) { + if len(p) == 0 { + return 0, nil + } + if b.off >= len(b.buf) { + return 0, io.EOF + } + n = copy(p, b.buf[b.off:]) + b.off += n + return +} + +// Seek implements the io.Seeker interface. +func (b *Buffer) Seek(offset int64, whence int) (int64, error) { + var abs int64 + switch whence { + case 0: // Whence 0 sets the offset as new offset. + abs = offset + case 1: // Whence 1 sets the current offset and offset as new offset. + abs = int64(b.off) + offset + case 2: // Whence 2 sets the total size of the buffer and offset + // as new offset, not supported yet. // FIXME. + return 0, errors.New("cache.Buffer.Seek: whence os.SEEK_END is not supported") + default: + return 0, errors.New("cache.Buffer.Seek: invalid whence") + } + if abs < 0 { + return 0, errors.New("cache.Buffer.Seek: negative position") + } + b.off = int(abs) + return abs, nil +} diff --git a/pkg/objcache/objcache.go b/pkg/objcache/objcache.go new file mode 100644 index 000000000..518c92ae1 --- /dev/null +++ b/pkg/objcache/objcache.go @@ -0,0 +1,143 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package objcache implements in memory caching methods. +package objcache + +import ( + "errors" + "io" + "sync" + "time" +) + +// NoExpiration represents caches to be permanent and can only be deleted. +var NoExpiration = time.Duration(0) + +// Cache holds the required variables to compose an in memory cache system +// which also provides expiring key mechanism and also maxSize. +type Cache struct { + // Mutex is used for handling the concurrent + // read/write requests for cache + mutex *sync.RWMutex + + // maxSize is a total size for overall cache + maxSize uint64 + + // currentSize is a current size in memory + currentSize uint64 + + // OnEviction - callback function for eviction + OnEviction func(a ...interface{}) + + // totalEvicted counter to keep track of total expirations + totalEvicted int + + // Represents in memory file system. + entries map[string]*Buffer + + // Expiration in time duration. + expiry time.Duration +} + +// New creates an inmemory cache +// +// maxSize is used for expiring objects before we run out of memory +// expiration is used for expiration of a key from cache +func New(maxSize uint64, expiry time.Duration) *Cache { + return &Cache{ + mutex: &sync.RWMutex{}, + maxSize: maxSize, + entries: make(map[string]*Buffer), + expiry: expiry, + } +} + +// ErrKeyNotFoundInCache - key not found in cache. +var ErrKeyNotFoundInCache = errors.New("Key not found in cache") + +// ErrCacheFull - cache is full. +var ErrCacheFull = errors.New("Not enough space in cache") + +// Size returns length of the value of a given key, returns -1 if key doesn't exist +func (c *Cache) Size(key string) int64 { + c.mutex.RLock() + defer c.mutex.RUnlock() + _, ok := c.entries[key] + if ok { + return c.entries[key].Size() + } + return -1 +} + +// Create validates and returns an in memory writer referencing entry. +func (c *Cache) Create(key string, size int64) (io.Writer, error) { + c.mutex.Lock() + defer c.mutex.Unlock() + + valueLen := uint64(size) + if c.maxSize > 0 { + // Check if the size of the object is not bigger than the capacity of the cache. + if valueLen > c.maxSize { + return nil, ErrCacheFull + } + // TODO - auto expire random key. + if c.currentSize+valueLen > c.maxSize { + return nil, ErrCacheFull + } + } + c.entries[key] = NewBuffer(make([]byte, 0, int(size))) + c.currentSize += valueLen + return c.entries[key], nil +} + +// Open - open the in-memory file, returns an memory reader. +// returns error ErrNotFoundInCache if fsPath does not exist. +func (c *Cache) Open(key string) (io.ReadSeeker, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + // Entry exists, return the readable buffer. + buffer, ok := c.entries[key] + if !ok { + return nil, ErrKeyNotFoundInCache + } + return buffer, nil +} + +// Delete - delete deletes an entry from in-memory fs. +func (c *Cache) Delete(key string) { + c.mutex.Lock() + defer c.mutex.Unlock() + + // Delete an entry. + buffer, ok := c.entries[key] + if ok { + size := buffer.Size() + c.deleteEntry(key, size) + } +} + +// Deletes the entry that was found. +func (c *Cache) deleteEntry(key string, size int64) { + delete(c.entries, key) + c.currentSize -= uint64(size) + c.totalEvicted++ + if c.OnEviction != nil { + c.OnEviction(key) + } +} diff --git a/pkg/objcache/objcache_test.go b/pkg/objcache/objcache_test.go new file mode 100644 index 000000000..70f2beadd --- /dev/null +++ b/pkg/objcache/objcache_test.go @@ -0,0 +1,95 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package objcache + +import ( + "io" + "io/ioutil" + "os" + "testing" +) + +// Tests different types of seekable operations on an allocated buffer. +func TestBufferSeek(t *testing.T) { + r := NewBuffer([]byte("0123456789")) + tests := []struct { + off int64 + seek int + n int + want string + wantpos int64 + seekerr string + }{ + {seek: os.SEEK_SET, off: 0, n: 20, want: "0123456789"}, + {seek: os.SEEK_SET, off: 1, n: 1, want: "1"}, + {seek: os.SEEK_CUR, off: 1, wantpos: 3, n: 2, want: "34"}, + {seek: os.SEEK_SET, off: -1, seekerr: "cache.Buffer.Seek: negative position"}, + {seek: os.SEEK_SET, off: 1 << 33, wantpos: 1 << 33}, + {seek: os.SEEK_CUR, off: 1, wantpos: 1<<33 + 1}, + {seek: os.SEEK_SET, n: 5, want: "01234"}, + {seek: os.SEEK_CUR, n: 5, want: "56789"}, + {seek: os.SEEK_END, off: -1, seekerr: "cache.Buffer.Seek: whence os.SEEK_END is not supported"}, + } + + for i, tt := range tests { + pos, err := r.Seek(tt.off, tt.seek) + if err == nil && tt.seekerr != "" { + t.Errorf("%d. want seek error %q", i, tt.seekerr) + continue + } + if err != nil && err.Error() != tt.seekerr { + t.Errorf("%d. seek error = %q; want %q", i, err.Error(), tt.seekerr) + continue + } + if tt.wantpos != 0 && tt.wantpos != pos { + t.Errorf("%d. pos = %d, want %d", i, pos, tt.wantpos) + } + buf := make([]byte, tt.n) + n, err := r.Read(buf) + if err != nil { + t.Errorf("%d. read = %v", i, err) + continue + } + got := string(buf[:n]) + if got != tt.want { + t.Errorf("%d. got %q; want %q", i, got, tt.want) + } + } +} + +// Tests read operation after big seek. +func TestReadAfterBigSeek(t *testing.T) { + r := NewBuffer([]byte("0123456789")) + if _, err := r.Seek(1<<31+5, os.SEEK_SET); err != nil { + t.Fatal(err) + } + if n, err := r.Read(make([]byte, 10)); n != 0 || err != io.EOF { + t.Errorf("Read = %d, %v; want 0, EOF", n, err) + } +} + +// tests that Len is affected by reads, but Size is not. +func TestBufferLenSize(t *testing.T) { + r := NewBuffer([]byte("abc")) + io.CopyN(ioutil.Discard, r, 1) + if r.Len() != 2 { + t.Errorf("Len = %d; want 2", r.Len()) + } + if r.Size() != 3 { + t.Errorf("Size = %d; want 3", r.Size()) + } +} diff --git a/server-main.go b/server-main.go index 703be5267..adbdfde35 100644 --- a/server-main.go +++ b/server-main.go @@ -141,6 +141,14 @@ func initServerConfig(c *cli.Context) { fatalIf(err, "Unable to convert MINIO_MAXCONN=%s environment variable into its integer value.", maxConnStr) } + // Fetch max cache size from environment variable. + if maxCacheSizeStr := os.Getenv("MINIO_CACHE_SIZE"); maxCacheSizeStr != "" { + // We need to parse cache size to its integer value. + var err error + globalMaxCacheSize, err = strconvBytes(maxCacheSizeStr) + fatalIf(err, "Unable to convert MINIO_CACHE_SIZE=%s environment variable into its integer value.", maxCacheSizeStr) + } + // Fetch access keys from environment variables if any and update the config. accessKey := os.Getenv("MINIO_ACCESS_KEY") secretKey := os.Getenv("MINIO_SECRET_KEY") diff --git a/strconv-bytes.go b/strconv-bytes.go new file mode 100644 index 000000000..daf5df477 --- /dev/null +++ b/strconv-bytes.go @@ -0,0 +1,117 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "math" + "strconv" + "strings" + "unicode" +) + +// IEC Sizes, kibis of bits +const ( + Byte = 1 << (iota * 10) + KiByte + MiByte + GiByte + TiByte + PiByte + EiByte +) + +// Scientific notation Sizes. +const ( + IByte = 1 + KByte = IByte * 1000 + MByte = KByte * 1000 + GByte = MByte * 1000 + TByte = GByte * 1000 + PByte = TByte * 1000 + EByte = PByte * 1000 +) + +// This table represents both IEC and SI notations with their corresponding values. +var bytesSizeTable = map[string]uint64{ + "b": Byte, + "kib": KiByte, + "kb": KByte, + "mib": MiByte, + "mb": MByte, + "gib": GiByte, + "gb": GByte, + "tib": TiByte, + "tb": TByte, + "pib": PiByte, + "pb": PByte, + "eib": EiByte, + "eb": EByte, + // Without suffix + "": Byte, + "ki": KiByte, + "k": KByte, + "mi": MiByte, + "m": MByte, + "gi": GiByte, + "g": GByte, + "ti": TiByte, + "t": TByte, + "pi": PiByte, + "p": PByte, + "ei": EiByte, + "e": EByte, +} + +// strconvBytes parses a string representation of bytes into the number +// of bytes it represents. +// +// See Also: Bytes, IBytes. +// +// ParseBytes("42MB") -> 42000000, nil +// ParseBytes("42mib") -> 44040192, nil +func strconvBytes(s string) (uint64, error) { + lastDigit := 0 + // Calculates the final integer value. + for _, r := range s { + // This supports decimals as well. + if !(unicode.IsDigit(r) || r == '.') { + break + } + lastDigit++ + } + + // Float parsing to deal with decimal inputs. + f, err := strconv.ParseFloat(s[:lastDigit], 64) + if err != nil { + return 0, err + } + + // Fetch the corresponding byte size for notation. + byteSize := strings.ToLower(strings.TrimSpace(s[lastDigit:])) + size, ok := bytesSizeTable[byteSize] + if !ok { + return 0, fmt.Errorf("Unrecognized size notation name: %v", byteSize) + } + f *= float64(size) + // Return an error if final value overflows uint64 max. + if f >= math.MaxUint64 { + return 0, fmt.Errorf("too large: %v", s) + } + // Success. + return uint64(f), nil +} diff --git a/strconv-bytes_test.go b/strconv-bytes_test.go new file mode 100644 index 000000000..a855b46b3 --- /dev/null +++ b/strconv-bytes_test.go @@ -0,0 +1,112 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "testing" + +// Tests various variants in supporting all the byte conversions. +func TestByteConv(t *testing.T) { + // List of all tests for testing notation to corresponding + // byte conversions. + tests := []struct { + in string + exp uint64 + }{ + // Using IEC notation. + {"42", 42}, + {"42MB", 42000000}, + {"42MiB", 44040192}, + {"42mb", 42000000}, + {"42mib", 44040192}, + {"42MIB", 44040192}, + {"42 MB", 42000000}, + {"42 MiB", 44040192}, + {"42 mb", 42000000}, + {"42 mib", 44040192}, + {"42 MIB", 44040192}, + {"42.5MB", 42500000}, + {"42.5MiB", 44564480}, + {"42.5 MB", 42500000}, + {"42.5 MiB", 44564480}, + // Using SI notation. + {"42M", 42000000}, + {"42Mi", 44040192}, + {"42m", 42000000}, + {"42mi", 44040192}, + {"42MI", 44040192}, + {"42 M", 42000000}, + {"42 Mi", 44040192}, + {"42 m", 42000000}, + {"42 mi", 44040192}, + {"42 MI", 44040192}, + // With decimal values. + {"42.5M", 42500000}, + {"42.5Mi", 44564480}, + {"42.5 M", 42500000}, + {"42.5 Mi", 44564480}, + // With no more digits after '.' + {"42.M", 42000000}, + {"42.Mi", 44040192}, + {"42. m", 42000000}, + {"42. mi", 44040192}, + {"42. M", 42000000}, + {"42. Mi", 44040192}, + // Large testing, breaks when too much larger than this. + {"12.5 EB", uint64(12.5 * float64(EByte))}, + {"12.5 E", uint64(12.5 * float64(EByte))}, + {"12.5 EiB", uint64(12.5 * float64(EiByte))}, + } + + // Tests all notation variants. + for _, p := range tests { + got, err := strconvBytes(p.in) + if err != nil { + t.Errorf("Couldn't parse %v: %v", p.in, err) + } + if got != p.exp { + t.Errorf("Expected %v for %v, got %v", p.exp, p.in, got) + } + } +} + +// Validates different types of input errors. +func TestByteErrors(t *testing.T) { + // Input with integer and double space between notations. + got, err := strconvBytes("84 JB") + if err == nil { + t.Errorf("Expected error, got %v", got) + } + // Empty string. + got, err = strconvBytes("") + if err == nil { + t.Errorf("Expected error parsing nothing") + } + // Too large. + got, err = strconvBytes("16 EiB") + if err == nil { + t.Errorf("Expected error, got %v", got) + } +} + +// Add benchmarks here. + +// Benchmarks for bytes converter. +func BenchmarkParseBytes(b *testing.B) { + for i := 0; i < b.N; i++ { + strconvBytes("16.5 GB") + } +} diff --git a/xl-v1-errors.go b/xl-v1-errors.go new file mode 100644 index 000000000..b07c47dc2 --- /dev/null +++ b/xl-v1-errors.go @@ -0,0 +1,37 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import "errors" + +// errXLMaxDisks - returned for reached maximum of disks. +var errXLMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'") + +// errXLMinDisks - returned for minimum number of disks. +var errXLMinDisks = errors.New("Minimum '6' disks are required to enable erasure code") + +// errXLNumDisks - returned for odd number of disks. +var errXLNumDisks = errors.New("Total number of disks should be multiples of '2'") + +// errXLReadQuorum - did not meet read quorum. +var errXLReadQuorum = errors.New("Read failed. Insufficient number of disks online") + +// errXLWriteQuorum - did not meet write quorum. +var errXLWriteQuorum = errors.New("Write failed. Insufficient number of disks online") + +// errXLDataCorrupt - err data corrupt. +var errXLDataCorrupt = errors.New("Data likely corrupted, read failed.") diff --git a/xl-v1-object.go b/xl-v1-object.go index dacc60fca..1e42cf091 100644 --- a/xl-v1-object.go +++ b/xl-v1-object.go @@ -27,6 +27,7 @@ import ( "time" "github.com/minio/minio/pkg/mimedb" + "github.com/minio/minio/pkg/objcache" ) /// Object Operations @@ -92,6 +93,45 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i eInfos = append(eInfos, metaArr[index].Erasure) } + // Save the writer. + mw := writer + + // Object cache enabled block. + if xl.objCacheEnabled { + // Validate if we have previous cache. + cachedBuffer, err := xl.objCache.Open(path.Join(bucket, object)) + if err == nil { // Cache hit. + // Advance the buffer to offset as if it was read. + if _, err = cachedBuffer.Seek(startOffset, 0); err != nil { // Seek to the offset. + return err + } + // Write the requested length. + if _, err = io.CopyN(writer, cachedBuffer, length); err != nil { + return err + } + return nil + } // Cache miss. + + // For unknown error, return and error out. + if err != objcache.ErrKeyNotFoundInCache { + return err + } // Cache has not been found, fill the cache. + + // Proceed to set the cache. + var newBuffer io.Writer + // Cache is only set if whole object is being read. + if startOffset == 0 && length == xlMeta.Stat.Size { + // Create a new entry in memory of length. + newBuffer, err = xl.objCache.Create(path.Join(bucket, object), length) + if err != nil { + // Perhaps cache is full, returns here. + return err + } + // Create a multi writer to write to both memory and client response. + mw = io.MultiWriter(newBuffer, writer) + } + } + totalBytesRead := int64(0) // Read from all parts. for ; partIndex <= lastPartIndex; partIndex++ { @@ -109,7 +149,7 @@ func (xl xlObjects) GetObject(bucket, object string, startOffset int64, length i } // Start reading the part name. - n, err := erasureReadFile(writer, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) + n, err := erasureReadFile(mw, onlineDisks, bucket, pathJoin(object, partName), partName, eInfos, partOffset, readSize, partSize) if err != nil { return err } diff --git a/xl-v1.go b/xl-v1.go index a346bbdaf..d5b4993ac 100644 --- a/xl-v1.go +++ b/xl-v1.go @@ -17,11 +17,11 @@ package main import ( - "errors" "fmt" "sort" "github.com/minio/minio/pkg/disk" + "github.com/minio/minio/pkg/objcache" ) // XL constants. @@ -37,6 +37,15 @@ const ( // Uploads metadata file carries per multipart object metadata. uploadsJSONFile = "uploads.json" + + // 8GiB cache by default. + maxCacheSize = 8 * 1024 * 1024 * 1024 + + // Maximum erasure blocks. + maxErasureBlocks = 16 + + // Minimum erasure blocks. + minErasureBlocks = 6 ) // xlObjects - Implements XL object layer. @@ -48,35 +57,16 @@ type xlObjects struct { readQuorum int // readQuorum minimum required disks to read data. writeQuorum int // writeQuorum minimum required disks to write data. - // List pool management. + // ListObjects pool management. listPool *treeWalkPool + + // Object cache for caching objects. + objCache *objcache.Cache + + // Object cache enabled. + objCacheEnabled bool } -// errXLMaxDisks - returned for reached maximum of disks. -var errXLMaxDisks = errors.New("Number of disks are higher than supported maximum count '16'") - -// errXLMinDisks - returned for minimum number of disks. -var errXLMinDisks = errors.New("Number of disks are smaller than supported minimum count '8'") - -// errXLNumDisks - returned for odd number of disks. -var errXLNumDisks = errors.New("Number of disks should be multiples of '2'") - -// errXLReadQuorum - did not meet read quorum. -var errXLReadQuorum = errors.New("I/O error. did not meet read quorum.") - -// errXLWriteQuorum - did not meet write quorum. -var errXLWriteQuorum = errors.New("I/O error. did not meet write quorum.") - -// errXLDataCorrupt - err data corrupt. -var errXLDataCorrupt = errors.New("data likely corrupted, all blocks are zero in length") - -const ( - // Maximum erasure blocks. - maxErasureBlocks = 16 - // Minimum erasure blocks. - minErasureBlocks = 6 -) - // Validate if input disks are sufficient for initializing XL. func checkSufficientDisks(disks []string) error { // Verify total number of disks. @@ -174,7 +164,11 @@ func newXLObjects(disks []string) (ObjectLayer, error) { storageDisks: newPosixDisks, dataBlocks: dataBlocks, parityBlocks: parityBlocks, - listPool: newTreeWalkPool(globalLookupTimeout), + // Inititalize list pool. + listPool: newTreeWalkPool(globalLookupTimeout), + // Initialize object caching, FIXME: support auto cache expiration. + objCache: objcache.New(globalMaxCacheSize, objcache.NoExpiration), + objCacheEnabled: globalMaxCacheSize > 0, } // Figure out read and write quorum based on number of storage disks.