diff --git a/pkg/storage/drivers/memory/blockingwriter.go b/pkg/storage/drivers/memory/blockingwriter.go new file mode 100644 index 000000000..8c63a2d7c --- /dev/null +++ b/pkg/storage/drivers/memory/blockingwriter.go @@ -0,0 +1,65 @@ +/* + * Minimalist Object Storage, (C) 2015 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + "io" + "sync" +) + +// BlockingWriteCloser is a WriteCloser that blocks until released +type BlockingWriteCloser struct { + w io.WriteCloser + release *sync.WaitGroup + err error +} + +// Write to the underlying writer +func (b *BlockingWriteCloser) Write(p []byte) (int, error) { + n, err := b.w.Write(p) + if err != nil { + b.err = err + } + return n, b.err +} + +// Close blocks until another goroutine calls Release(error). Returns error code if either +// writer fails or Release is called with an error. +func (b *BlockingWriteCloser) Close() error { + err := b.w.Close() + if err != nil { + b.err = err + } + b.release.Wait() + return b.err +} + +// Release the Close, causing it to unblock. Only call this once. Calling it multiple times results in a panic. +func (b *BlockingWriteCloser) Release(err error) { + b.release.Done() + if err != nil { + b.err = err + } + return +} + +// NewBlockingWriteCloser Creates a new write closer that must be released by the read consumer. +func NewBlockingWriteCloser(w io.WriteCloser) *BlockingWriteCloser { + wg := &sync.WaitGroup{} + wg.Add(1) + return &BlockingWriteCloser{w: w, release: wg} +} diff --git a/pkg/storage/drivers/memory/lru.go b/pkg/storage/drivers/memory/lru.go index 99e277bce..07dc3c426 100644 --- a/pkg/storage/drivers/memory/lru.go +++ b/pkg/storage/drivers/memory/lru.go @@ -93,6 +93,7 @@ func (c *Cache) Stats() CacheStats { // Add adds a value to the cache. func (c *Cache) Add(key Key, size int64) io.WriteCloser { r, w := io.Pipe() + blockingWriter := NewBlockingWriteCloser(w) go func() { if uint64(size) > c.MaxSize { err := iodine.New(drivers.EntityTooLarge{ @@ -100,6 +101,7 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { MaxSize: strconv.FormatUint(c.MaxSize, 10), }, nil) r.CloseWithError(err) + blockingWriter.Release(err) return } // If MaxSize is zero expecting infinite memory @@ -109,14 +111,18 @@ func (c *Cache) Add(key Key, size int64) io.WriteCloser { value := new(bytes.Buffer) n, err := io.CopyN(value, r, size) if err != nil { - r.CloseWithError(iodine.New(err, nil)) + err := iodine.New(err, nil) + r.CloseWithError(err) + blockingWriter.Release(err) return } ele := c.ll.PushFront(&entry{key, value}) c.cache[key] = ele c.totalSize += uint64(n) + r.Close() + blockingWriter.Release(nil) }() - return w + return blockingWriter } // Get looks up a key's value from the cache. diff --git a/pkg/storage/drivers/memory/memory_test.go b/pkg/storage/drivers/memory/memory_test.go index 3ed145fd8..3a2b9c96d 100644 --- a/pkg/storage/drivers/memory/memory_test.go +++ b/pkg/storage/drivers/memory/memory_test.go @@ -18,6 +18,7 @@ package memory import ( "testing" + "time" . "github.com/minio-io/check" "github.com/minio-io/minio/pkg/storage/drivers" @@ -31,7 +32,7 @@ var _ = Suite(&MySuite{}) func (s *MySuite) TestAPISuite(c *C) { create := func() drivers.Driver { - _, _, store := Start(10000000, 0) + _, _, store := Start(1000000, 3*time.Hour) return store } drivers.APITestSuite(c, create)