use unixNanoTime instead of time.Time in lockRequestorInfo (#20140)

Bonus: Skip Source, Quorum fields in lockArgs that are never
sent during Unlock() phase.
This commit is contained in:
Harshavardhana 2024-07-24 03:24:01 -07:00 committed by GitHub
parent 6fe2b3f901
commit 3b21bb5be8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 197 additions and 89 deletions

View File

@ -848,9 +848,10 @@ func (a adminAPIHandlers) DataUsageInfoHandler(w http.ResponseWriter, r *http.Re
}
func lriToLockEntry(l lockRequesterInfo, now time.Time, resource, server string) *madmin.LockEntry {
t := time.Unix(0, l.Timestamp)
entry := &madmin.LockEntry{
Timestamp: l.Timestamp,
Elapsed: now.Sub(l.Timestamp),
Timestamp: t,
Elapsed: now.Sub(t),
Resource: resource,
ServerList: []string{server},
Source: l.Source,

View File

@ -463,6 +463,7 @@ func TestTopLockEntries(t *testing.T) {
Owner: lri.Owner,
ID: lri.UID,
Quorum: lri.Quorum,
Timestamp: time.Unix(0, lri.Timestamp),
})
}

View File

@ -34,8 +34,8 @@ type lockRequesterInfo struct {
Name string // name of the resource lock was requested for
Writer bool // Bool whether write or read lock.
UID string // UID to uniquely identify request of client.
Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastRefresh time.Time // Timestamp for last lock refresh.
Timestamp int64 // Timestamp set at the time of initialization.
TimeLastRefresh int64 // Timestamp for last lock refresh.
Source string // Contains line, function and filename requesting the lock.
Group bool // indicates if it was a group lock.
Owner string // Owner represents the UUID of the owner who originally requested the lock.
@ -87,6 +87,7 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
// No locks held on the all resources, so claim write
// lock on all resources at once.
now := UTCNow()
for i, resource := range args.Resources {
l.lockMap[resource] = []lockRequesterInfo{
{
@ -95,10 +96,10 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Timestamp: now.UnixNano(),
TimeLastRefresh: now.UnixNano(),
Group: len(args.Resources) > 1,
Quorum: args.Quorum,
Quorum: *args.Quorum,
idx: i,
},
}
@ -108,7 +109,7 @@ func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool
}
func formatUUID(s string, idx int) string {
return s + strconv.Itoa(idx)
return concat(s, strconv.Itoa(idx))
}
func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
@ -167,17 +168,19 @@ func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply boo
l.mutex.Lock()
defer l.mutex.Unlock()
resource := args.Resources[0]
now := UTCNow()
lrInfo := lockRequesterInfo{
Name: resource,
Writer: false,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Quorum: args.Quorum,
Timestamp: now.UnixNano(),
TimeLastRefresh: now.UnixNano(),
Quorum: *args.Quorum,
}
if lri, ok := l.lockMap[resource]; ok {
lri, ok := l.lockMap[resource]
if ok {
if reply = !isWriteLock(lri); reply {
// Unless there is a write lock
l.lockMap[resource] = append(l.lockMap[resource], lrInfo)
@ -351,9 +354,10 @@ func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refresh
delete(l.lockUID, formatUUID(args.UID, idx))
return idx > 0, nil
}
now := UTCNow()
for i := range lris {
if lris[i].UID == args.UID {
lris[i].TimeLastRefresh = UTCNow()
lris[i].TimeLastRefresh = now.UnixNano()
}
}
idx++
@ -376,7 +380,7 @@ func (l *localLocker) expireOldLocks(interval time.Duration) {
modified := false
for i := 0; i < len(lris); {
lri := &lris[i]
if time.Since(lri.TimeLastRefresh) > interval {
if time.Since(time.Unix(0, lri.TimeLastRefresh)) > interval {
delete(l.lockUID, formatUUID(lri.UID, lri.idx))
if len(lris) == 1 {
// Remove the write lock.

View File

@ -204,13 +204,13 @@ func (z *lockRequesterInfo) DecodeMsg(dc *msgp.Reader) (err error) {
return
}
case "Timestamp":
z.Timestamp, err = dc.ReadTime()
z.Timestamp, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "Timestamp")
return
}
case "TimeLastRefresh":
z.TimeLastRefresh, err = dc.ReadTime()
z.TimeLastRefresh, err = dc.ReadInt64()
if err != nil {
err = msgp.WrapError(err, "TimeLastRefresh")
return
@ -288,7 +288,7 @@ func (z *lockRequesterInfo) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
err = en.WriteTime(z.Timestamp)
err = en.WriteInt64(z.Timestamp)
if err != nil {
err = msgp.WrapError(err, "Timestamp")
return
@ -298,7 +298,7 @@ func (z *lockRequesterInfo) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
err = en.WriteTime(z.TimeLastRefresh)
err = en.WriteInt64(z.TimeLastRefresh)
if err != nil {
err = msgp.WrapError(err, "TimeLastRefresh")
return
@ -361,10 +361,10 @@ func (z *lockRequesterInfo) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.AppendString(o, z.UID)
// string "Timestamp"
o = append(o, 0xa9, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70)
o = msgp.AppendTime(o, z.Timestamp)
o = msgp.AppendInt64(o, z.Timestamp)
// string "TimeLastRefresh"
o = append(o, 0xaf, 0x54, 0x69, 0x6d, 0x65, 0x4c, 0x61, 0x73, 0x74, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68)
o = msgp.AppendTime(o, z.TimeLastRefresh)
o = msgp.AppendInt64(o, z.TimeLastRefresh)
// string "Source"
o = append(o, 0xa6, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65)
o = msgp.AppendString(o, z.Source)
@ -417,13 +417,13 @@ func (z *lockRequesterInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
case "Timestamp":
z.Timestamp, bts, err = msgp.ReadTimeBytes(bts)
z.Timestamp, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "Timestamp")
return
}
case "TimeLastRefresh":
z.TimeLastRefresh, bts, err = msgp.ReadTimeBytes(bts)
z.TimeLastRefresh, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
err = msgp.WrapError(err, "TimeLastRefresh")
return
@ -466,7 +466,7 @@ func (z *lockRequesterInfo) UnmarshalMsg(bts []byte) (o []byte, err error) {
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *lockRequesterInfo) Msgsize() (s int) {
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 7 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.UID) + 10 + msgp.TimeSize + 16 + msgp.TimeSize + 7 + msgp.StringPrefixSize + len(z.Source) + 6 + msgp.BoolSize + 6 + msgp.StringPrefixSize + len(z.Owner) + 7 + msgp.IntSize
s = 1 + 5 + msgp.StringPrefixSize + len(z.Name) + 7 + msgp.BoolSize + 4 + msgp.StringPrefixSize + len(z.UID) + 10 + msgp.Int64Size + 16 + msgp.Int64Size + 7 + msgp.StringPrefixSize + len(z.Source) + 6 + msgp.BoolSize + 6 + msgp.StringPrefixSize + len(z.Owner) + 7 + msgp.IntSize
return
}

View File

@ -32,6 +32,7 @@ import (
func TestLocalLockerExpire(t *testing.T) {
wResources := make([]string, 1000)
rResources := make([]string, 1000)
quorum := 0
l := newLocker()
ctx := context.Background()
for i := range wResources {
@ -40,7 +41,7 @@ func TestLocalLockerExpire(t *testing.T) {
Resources: []string{mustGetUUID()},
Source: t.Name(),
Owner: "owner",
Quorum: 0,
Quorum: &quorum,
}
ok, err := l.Lock(ctx, arg)
if err != nil {
@ -58,14 +59,14 @@ func TestLocalLockerExpire(t *testing.T) {
Resources: []string{name},
Source: t.Name(),
Owner: "owner",
Quorum: 0,
Quorum: &quorum,
}
ok, err := l.RLock(ctx, arg)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("did not get write lock")
t.Fatal("did not get read lock")
}
// RLock twice
ok, err = l.RLock(ctx, arg)
@ -112,6 +113,7 @@ func TestLocalLockerUnlock(t *testing.T) {
rUIDs := make([]string, 0, n*2)
l := newLocker()
ctx := context.Background()
quorum := 0
for i := range wResources {
names := [m]string{}
for j := range names {
@ -123,7 +125,7 @@ func TestLocalLockerUnlock(t *testing.T) {
Resources: names[:],
Source: t.Name(),
Owner: "owner",
Quorum: 0,
Quorum: &quorum,
}
ok, err := l.Lock(ctx, arg)
if err != nil {
@ -144,7 +146,7 @@ func TestLocalLockerUnlock(t *testing.T) {
Resources: []string{name},
Source: t.Name(),
Owner: "owner",
Quorum: 0,
Quorum: &quorum,
}
ok, err := l.RLock(ctx, arg)
if err != nil {
@ -183,7 +185,7 @@ func TestLocalLockerUnlock(t *testing.T) {
Resources: []string{name},
Source: t.Name(),
Owner: "owner",
Quorum: 0,
Quorum: &quorum,
}
ok, err := l.RUnlock(ctx, arg)
if err != nil {
@ -203,6 +205,7 @@ func TestLocalLockerUnlock(t *testing.T) {
if len(l.lockUID) != len(rResources)+len(wResources)*m {
t.Fatalf("lockUID len, got %d, want %d + %d", len(l.lockUID), len(rResources), len(wResources)*m)
}
// RUnlock again, different uids
for i, name := range rResources {
arg := dsync.LockArgs{
@ -210,7 +213,7 @@ func TestLocalLockerUnlock(t *testing.T) {
Resources: []string{name},
Source: "minio",
Owner: "owner",
Quorum: 0,
Quorum: &quorum,
}
ok, err := l.RUnlock(ctx, arg)
if err != nil {
@ -238,7 +241,7 @@ func TestLocalLockerUnlock(t *testing.T) {
Resources: names[:],
Source: "minio",
Owner: "owner",
Quorum: 0,
Quorum: &quorum,
}
ok, err := l.Unlock(ctx, arg)
if err != nil {
@ -261,6 +264,7 @@ func TestLocalLockerUnlock(t *testing.T) {
func Test_localLocker_expireOldLocksExpire(t *testing.T) {
rng := rand.New(rand.NewSource(0))
quorum := 0
// Numbers of unique locks
for _, locks := range []int{100, 1000, 1e6} {
if testing.Short() && locks > 100 {
@ -289,7 +293,7 @@ func Test_localLocker_expireOldLocksExpire(t *testing.T) {
Resources: res,
Source: hex.EncodeToString(tmp[:8]),
Owner: hex.EncodeToString(tmp[8:]),
Quorum: 0,
Quorum: &quorum,
})
if !ok || err != nil {
t.Fatal("failed:", err, ok)
@ -311,7 +315,7 @@ func Test_localLocker_expireOldLocksExpire(t *testing.T) {
for _, v := range l.lockMap {
for i := range v {
if rng.Intn(2) == 0 {
v[i].TimeLastRefresh = expired
v[i].TimeLastRefresh = expired.UnixNano()
}
}
}
@ -347,6 +351,7 @@ func Test_localLocker_expireOldLocksExpire(t *testing.T) {
func Test_localLocker_RUnlock(t *testing.T) {
rng := rand.New(rand.NewSource(0))
quorum := 0
// Numbers of unique locks
for _, locks := range []int{1, 100, 1000, 1e6} {
if testing.Short() && locks > 100 {
@ -375,7 +380,7 @@ func Test_localLocker_RUnlock(t *testing.T) {
Resources: res,
Source: hex.EncodeToString(tmp[:8]),
Owner: hex.EncodeToString(tmp[8:]),
Quorum: 0,
Quorum: &quorum,
})
if !ok || err != nil {
t.Fatal("failed:", err, ok)

View File

@ -63,15 +63,15 @@ func TestLockRpcServerRemoveEntry(t *testing.T) {
Owner: "owner",
Writer: true,
UID: "0123-4567",
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Timestamp: UTCNow().UnixNano(),
TimeLastRefresh: UTCNow().UnixNano(),
}
lockRequesterInfo2 := lockRequesterInfo{
Owner: "owner",
Writer: true,
UID: "89ab-cdef",
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Timestamp: UTCNow().UnixNano(),
TimeLastRefresh: UTCNow().UnixNano(),
}
locker.ll.lockMap["name"] = []lockRequesterInfo{

View File

@ -34,6 +34,7 @@ import (
"sync"
"time"
"unicode/utf8"
"unsafe"
"github.com/google/uuid"
"github.com/klauspost/compress/s2"
@ -246,6 +247,24 @@ func pathsJoinPrefix(prefix string, elem ...string) (paths []string) {
return paths
}
// string concat alternative to s1 + s2 with low overhead.
func concat(ss ...string) string {
length := len(ss)
if length == 0 {
return ""
}
// create & allocate the memory in advance.
n := 0
for i := 0; i < length; i++ {
n += len(ss[i])
}
b := make([]byte, 0, n)
for i := 0; i < length; i++ {
b = append(b, ss[i]...)
}
return unsafe.String(unsafe.SliceData(b), n)
}
// pathJoin - like path.Join() but retains trailing SlashSeparator of the last element
func pathJoin(elem ...string) string {
sb := bytebufferpool.Get()

View File

@ -20,8 +20,10 @@ package cmd
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"path"
@ -47,6 +49,43 @@ func pathJoinOld(elem ...string) string {
return path.Join(elem...) + trailingSlash
}
func concatNaive(ss ...string) string {
rs := ss[0]
for i := 1; i < len(ss); i++ {
rs += ss[i]
}
return rs
}
func benchmark(b *testing.B, data []string) {
b.Run("concat naive", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
concatNaive(data...)
}
})
b.Run("concat fast", func(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
concat(data...)
}
})
}
func BenchmarkConcatImplementation(b *testing.B) {
data := make([]string, 2)
rng := rand.New(rand.NewSource(0))
for i := 0; i < 2; i++ {
var tmp [16]byte
rng.Read(tmp[:])
data[i] = hex.EncodeToString(tmp[:])
}
b.ResetTimer()
benchmark(b, data)
}
func BenchmarkPathJoinOld(b *testing.B) {
b.Run("PathJoin", func(b *testing.B) {
b.ResetTimer()

View File

@ -433,7 +433,7 @@ func lock(ctx context.Context, ds *Dsync, locks *[]string, id, source string, is
UID: id,
Resources: names,
Source: source,
Quorum: quorum,
Quorum: &quorum,
}
// Combined timeout for the lock attempt.

View File

@ -27,16 +27,16 @@ type LockArgs struct {
// Resources contains single or multiple entries to be locked/unlocked.
Resources []string
// Source contains the line number, function and file name of the code
// on the client node that requested the lock.
Source string
// Owner represents unique ID for this instance, an owner who originally requested
// the locked resource, useful primarily in figuring out stale locks.
Owner string
// Source contains the line number, function and file name of the code
// on the client node that requested the lock.
Source string `msgp:"omitempty"`
// Quorum represents the expected quorum for this lock type.
Quorum int
Quorum *int `msgp:"omitempty"`
}
// ResponseCode is the response code for a locking request.

View File

@ -49,24 +49,36 @@ func (z *LockArgs) DecodeMsg(dc *msgp.Reader) (err error) {
return
}
}
case "Source":
z.Source, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Source")
return
}
case "Owner":
z.Owner, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Owner")
return
}
case "Source":
z.Source, err = dc.ReadString()
if err != nil {
err = msgp.WrapError(err, "Source")
return
}
case "Quorum":
z.Quorum, err = dc.ReadInt()
if dc.IsNil() {
err = dc.ReadNil()
if err != nil {
err = msgp.WrapError(err, "Quorum")
return
}
z.Quorum = nil
} else {
if z.Quorum == nil {
z.Quorum = new(int)
}
*z.Quorum, err = dc.ReadInt()
if err != nil {
err = msgp.WrapError(err, "Quorum")
return
}
}
default:
err = dc.Skip()
if err != nil {
@ -108,16 +120,6 @@ func (z *LockArgs) EncodeMsg(en *msgp.Writer) (err error) {
return
}
}
// write "Source"
err = en.Append(0xa6, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Source)
if err != nil {
err = msgp.WrapError(err, "Source")
return
}
// write "Owner"
err = en.Append(0xa5, 0x4f, 0x77, 0x6e, 0x65, 0x72)
if err != nil {
@ -128,16 +130,33 @@ func (z *LockArgs) EncodeMsg(en *msgp.Writer) (err error) {
err = msgp.WrapError(err, "Owner")
return
}
// write "Source"
err = en.Append(0xa6, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65)
if err != nil {
return
}
err = en.WriteString(z.Source)
if err != nil {
err = msgp.WrapError(err, "Source")
return
}
// write "Quorum"
err = en.Append(0xa6, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d)
if err != nil {
return
}
err = en.WriteInt(z.Quorum)
if z.Quorum == nil {
err = en.WriteNil()
if err != nil {
return
}
} else {
err = en.WriteInt(*z.Quorum)
if err != nil {
err = msgp.WrapError(err, "Quorum")
return
}
}
return
}
@ -154,15 +173,19 @@ func (z *LockArgs) MarshalMsg(b []byte) (o []byte, err error) {
for za0001 := range z.Resources {
o = msgp.AppendString(o, z.Resources[za0001])
}
// string "Source"
o = append(o, 0xa6, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65)
o = msgp.AppendString(o, z.Source)
// string "Owner"
o = append(o, 0xa5, 0x4f, 0x77, 0x6e, 0x65, 0x72)
o = msgp.AppendString(o, z.Owner)
// string "Source"
o = append(o, 0xa6, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65)
o = msgp.AppendString(o, z.Source)
// string "Quorum"
o = append(o, 0xa6, 0x51, 0x75, 0x6f, 0x72, 0x75, 0x6d)
o = msgp.AppendInt(o, z.Quorum)
if z.Quorum == nil {
o = msgp.AppendNil(o)
} else {
o = msgp.AppendInt(o, *z.Quorum)
}
return
}
@ -209,24 +232,35 @@ func (z *LockArgs) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
}
case "Source":
z.Source, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Source")
return
}
case "Owner":
z.Owner, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Owner")
return
}
case "Source":
z.Source, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Source")
return
}
case "Quorum":
z.Quorum, bts, err = msgp.ReadIntBytes(bts)
if msgp.IsNil(bts) {
bts, err = msgp.ReadNilBytes(bts)
if err != nil {
return
}
z.Quorum = nil
} else {
if z.Quorum == nil {
z.Quorum = new(int)
}
*z.Quorum, bts, err = msgp.ReadIntBytes(bts)
if err != nil {
err = msgp.WrapError(err, "Quorum")
return
}
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
@ -245,7 +279,12 @@ func (z *LockArgs) Msgsize() (s int) {
for za0001 := range z.Resources {
s += msgp.StringPrefixSize + len(z.Resources[za0001])
}
s += 7 + msgp.StringPrefixSize + len(z.Source) + 6 + msgp.StringPrefixSize + len(z.Owner) + 7 + msgp.IntSize
s += 6 + msgp.StringPrefixSize + len(z.Owner) + 7 + msgp.StringPrefixSize + len(z.Source) + 7
if z.Quorum == nil {
s += msgp.NilSize
} else {
s += msgp.IntSize
}
return
}