mirror of
https://github.com/minio/minio.git
synced 2025-01-15 08:45:00 -05:00
ab7d3cd508
Change distributed locking to allow taking bulk locks across objects, reduces usually 1000 calls to 1. Also allows for situations where multiple clients sends delete requests to objects with following names ``` {1,2,3,4,5} ``` ``` {5,4,3,2,1} ``` will block and ensure that we do not fail the request on each other.
359 lines
8.0 KiB
Go
359 lines
8.0 KiB
Go
/*
|
|
* Minio Cloud Storage, (C) 2016 Minio, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
// GOMAXPROCS=10 go test
|
|
|
|
package dsync_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"net/rpc"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
. "github.com/minio/minio/pkg/dsync"
|
|
)
|
|
|
|
var ds *Dsync
|
|
var rpcPaths []string // list of rpc paths where lock server is serving.
|
|
|
|
func startRPCServers(nodes []string) {
|
|
for i := range nodes {
|
|
server := rpc.NewServer()
|
|
server.RegisterName("Dsync", &lockServer{
|
|
mutex: sync.Mutex{},
|
|
lockMap: make(map[string]int64),
|
|
})
|
|
// For some reason the registration paths need to be different (even for different server objs)
|
|
server.HandleHTTP(rpcPaths[i], fmt.Sprintf("%s-debug", rpcPaths[i]))
|
|
l, e := net.Listen("tcp", ":"+strconv.Itoa(i+12345))
|
|
if e != nil {
|
|
log.Fatal("listen error:", e)
|
|
}
|
|
go http.Serve(l, nil)
|
|
}
|
|
|
|
// Let servers start
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
// TestMain initializes the testing framework
|
|
func TestMain(m *testing.M) {
|
|
const rpcPath = "/dsync"
|
|
|
|
rand.Seed(time.Now().UTC().UnixNano())
|
|
|
|
nodes := make([]string, 4) // list of node IP addrs or hostname with ports.
|
|
for i := range nodes {
|
|
nodes[i] = fmt.Sprintf("127.0.0.1:%d", i+12345)
|
|
}
|
|
for i := range nodes {
|
|
rpcPaths = append(rpcPaths, rpcPath+"-"+strconv.Itoa(i))
|
|
}
|
|
|
|
// Initialize net/rpc clients for dsync.
|
|
var clnts []NetLocker
|
|
for i := 0; i < len(nodes); i++ {
|
|
clnts = append(clnts, newClient(nodes[i], rpcPaths[i]))
|
|
}
|
|
|
|
ds = &Dsync{
|
|
GetLockersFn: func() []NetLocker { return clnts },
|
|
}
|
|
|
|
startRPCServers(nodes)
|
|
|
|
os.Exit(m.Run())
|
|
}
|
|
|
|
func TestSimpleLock(t *testing.T) {
|
|
|
|
dm := NewDRWMutex(context.Background(), ds, "test")
|
|
|
|
dm.Lock(id, source)
|
|
|
|
// fmt.Println("Lock acquired, waiting...")
|
|
time.Sleep(2500 * time.Millisecond)
|
|
|
|
dm.Unlock()
|
|
}
|
|
|
|
func TestSimpleLockUnlockMultipleTimes(t *testing.T) {
|
|
|
|
dm := NewDRWMutex(context.Background(), ds, "test")
|
|
|
|
dm.Lock(id, source)
|
|
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
|
|
dm.Unlock()
|
|
|
|
dm.Lock(id, source)
|
|
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
|
|
dm.Unlock()
|
|
|
|
dm.Lock(id, source)
|
|
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
|
|
dm.Unlock()
|
|
|
|
dm.Lock(id, source)
|
|
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
|
|
dm.Unlock()
|
|
|
|
dm.Lock(id, source)
|
|
time.Sleep(time.Duration(10+(rand.Float32()*50)) * time.Millisecond)
|
|
dm.Unlock()
|
|
}
|
|
|
|
// Test two locks for same resource, one succeeds, one fails (after timeout)
|
|
func TestTwoSimultaneousLocksForSameResource(t *testing.T) {
|
|
|
|
dm1st := NewDRWMutex(context.Background(), ds, "aap")
|
|
dm2nd := NewDRWMutex(context.Background(), ds, "aap")
|
|
|
|
dm1st.Lock(id, source)
|
|
|
|
// Release lock after 10 seconds
|
|
go func() {
|
|
time.Sleep(10 * time.Second)
|
|
// fmt.Println("Unlocking dm1")
|
|
|
|
dm1st.Unlock()
|
|
}()
|
|
|
|
dm2nd.Lock(id, source)
|
|
|
|
// fmt.Printf("2nd lock obtained after 1st lock is released\n")
|
|
time.Sleep(2500 * time.Millisecond)
|
|
|
|
dm2nd.Unlock()
|
|
}
|
|
|
|
// Test three locks for same resource, one succeeds, one fails (after timeout)
|
|
func TestThreeSimultaneousLocksForSameResource(t *testing.T) {
|
|
|
|
dm1st := NewDRWMutex(context.Background(), ds, "aap")
|
|
dm2nd := NewDRWMutex(context.Background(), ds, "aap")
|
|
dm3rd := NewDRWMutex(context.Background(), ds, "aap")
|
|
|
|
dm1st.Lock(id, source)
|
|
|
|
// Release lock after 10 seconds
|
|
go func() {
|
|
time.Sleep(10 * time.Second)
|
|
// fmt.Println("Unlocking dm1")
|
|
|
|
dm1st.Unlock()
|
|
}()
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
dm2nd.Lock(id, source)
|
|
|
|
// Release lock after 10 seconds
|
|
go func() {
|
|
time.Sleep(2500 * time.Millisecond)
|
|
// fmt.Println("Unlocking dm2")
|
|
|
|
dm2nd.Unlock()
|
|
}()
|
|
|
|
dm3rd.Lock(id, source)
|
|
|
|
// fmt.Printf("3rd lock obtained after 1st & 2nd locks are released\n")
|
|
time.Sleep(2500 * time.Millisecond)
|
|
|
|
dm3rd.Unlock()
|
|
}()
|
|
|
|
go func() {
|
|
defer wg.Done()
|
|
|
|
dm3rd.Lock(id, source)
|
|
|
|
// Release lock after 10 seconds
|
|
go func() {
|
|
time.Sleep(2500 * time.Millisecond)
|
|
// fmt.Println("Unlocking dm3")
|
|
|
|
dm3rd.Unlock()
|
|
}()
|
|
|
|
dm2nd.Lock(id, source)
|
|
|
|
// fmt.Printf("2nd lock obtained after 1st & 3rd locks are released\n")
|
|
time.Sleep(2500 * time.Millisecond)
|
|
|
|
dm2nd.Unlock()
|
|
}()
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
// Test two locks for different resources, both succeed
|
|
func TestTwoSimultaneousLocksForDifferentResources(t *testing.T) {
|
|
|
|
dm1 := NewDRWMutex(context.Background(), ds, "aap")
|
|
dm2 := NewDRWMutex(context.Background(), ds, "noot")
|
|
|
|
dm1.Lock(id, source)
|
|
dm2.Lock(id, source)
|
|
|
|
// fmt.Println("Both locks acquired, waiting...")
|
|
time.Sleep(2500 * time.Millisecond)
|
|
|
|
dm1.Unlock()
|
|
dm2.Unlock()
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
|
|
// Borrowed from mutex_test.go
|
|
func HammerMutex(m *DRWMutex, loops int, cdone chan bool) {
|
|
for i := 0; i < loops; i++ {
|
|
m.Lock(id, source)
|
|
m.Unlock()
|
|
}
|
|
cdone <- true
|
|
}
|
|
|
|
// Borrowed from mutex_test.go
|
|
func TestMutex(t *testing.T) {
|
|
c := make(chan bool)
|
|
m := NewDRWMutex(context.Background(), ds, "test")
|
|
for i := 0; i < 10; i++ {
|
|
go HammerMutex(m, 1000, c)
|
|
}
|
|
for i := 0; i < 10; i++ {
|
|
<-c
|
|
}
|
|
}
|
|
|
|
func BenchmarkMutexUncontended(b *testing.B) {
|
|
type PaddedMutex struct {
|
|
*DRWMutex
|
|
}
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
var mu = PaddedMutex{NewDRWMutex(context.Background(), ds, "")}
|
|
for pb.Next() {
|
|
mu.Lock(id, source)
|
|
mu.Unlock()
|
|
}
|
|
})
|
|
}
|
|
|
|
func benchmarkMutex(b *testing.B, slack, work bool) {
|
|
mu := NewDRWMutex(context.Background(), ds, "")
|
|
if slack {
|
|
b.SetParallelism(10)
|
|
}
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
foo := 0
|
|
for pb.Next() {
|
|
mu.Lock(id, source)
|
|
mu.Unlock()
|
|
if work {
|
|
for i := 0; i < 100; i++ {
|
|
foo *= 2
|
|
foo /= 2
|
|
}
|
|
}
|
|
}
|
|
_ = foo
|
|
})
|
|
}
|
|
|
|
func BenchmarkMutex(b *testing.B) {
|
|
benchmarkMutex(b, false, false)
|
|
}
|
|
|
|
func BenchmarkMutexSlack(b *testing.B) {
|
|
benchmarkMutex(b, true, false)
|
|
}
|
|
|
|
func BenchmarkMutexWork(b *testing.B) {
|
|
benchmarkMutex(b, false, true)
|
|
}
|
|
|
|
func BenchmarkMutexWorkSlack(b *testing.B) {
|
|
benchmarkMutex(b, true, true)
|
|
}
|
|
|
|
func BenchmarkMutexNoSpin(b *testing.B) {
|
|
// This benchmark models a situation where spinning in the mutex should be
|
|
// non-profitable and allows to confirm that spinning does not do harm.
|
|
// To achieve this we create excess of goroutines most of which do local work.
|
|
// These goroutines yield during local work, so that switching from
|
|
// a blocked goroutine to other goroutines is profitable.
|
|
// As a matter of fact, this benchmark still triggers some spinning in the mutex.
|
|
m := NewDRWMutex(context.Background(), ds, "")
|
|
var acc0, acc1 uint64
|
|
b.SetParallelism(4)
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
c := make(chan bool)
|
|
var data [4 << 10]uint64
|
|
for i := 0; pb.Next(); i++ {
|
|
if i%4 == 0 {
|
|
m.Lock(id, source)
|
|
acc0 -= 100
|
|
acc1 += 100
|
|
m.Unlock()
|
|
} else {
|
|
for i := 0; i < len(data); i += 4 {
|
|
data[i]++
|
|
}
|
|
// Elaborate way to say runtime.Gosched
|
|
// that does not put the goroutine onto global runq.
|
|
go func() {
|
|
c <- true
|
|
}()
|
|
<-c
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func BenchmarkMutexSpin(b *testing.B) {
|
|
// This benchmark models a situation where spinning in the mutex should be
|
|
// profitable. To achieve this we create a goroutine per-proc.
|
|
// These goroutines access considerable amount of local data so that
|
|
// unnecessary rescheduling is penalized by cache misses.
|
|
m := NewDRWMutex(context.Background(), ds, "")
|
|
var acc0, acc1 uint64
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
var data [16 << 10]uint64
|
|
for i := 0; pb.Next(); i++ {
|
|
m.Lock(id, source)
|
|
acc0 -= 100
|
|
acc1 += 100
|
|
m.Unlock()
|
|
for i := 0; i < len(data); i += 4 {
|
|
data[i]++
|
|
}
|
|
}
|
|
})
|
|
}
|