minio/cmd/local-locker.go

408 lines
11 KiB
Go
Raw Normal View History

// Copyright (c) 2015-2021 MinIO, Inc.
//
// This file is part of MinIO Object Storage stack
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
//go:generate msgp -file=$GOFILE -unexported
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/minio/minio/internal/dsync"
)
// lockRequesterInfo stores various info from the client for each lock that is requested.
type lockRequesterInfo struct {
Name string // name of the resource lock was requested for
Writer bool // Bool whether write or read lock.
UID string // UID to uniquely identify request of client.
Timestamp time.Time // Timestamp set at the time of initialization.
TimeLastRefresh time.Time // Timestamp for last lock refresh.
Source string // Contains line, function and filename requesting the lock.
Group bool // indicates if it was a group lock.
Owner string // Owner represents the UUID of the owner who originally requested the lock.
Quorum int // Quorum represents the quorum required for this lock to be active.
idx int `msg:"-"` // index of the lock in the lockMap.
}
// isWriteLock returns whether the lock is a write or read lock.
func isWriteLock(lri []lockRequesterInfo) bool {
return len(lri) == 1 && lri[0].Writer
}
// localLocker implements Dsync.NetLocker
//
//msgp:ignore localLocker
type localLocker struct {
mutex sync.Mutex
lockMap map[string][]lockRequesterInfo
lockUID map[string]string // UUID -> resource map.
}
func (l *localLocker) String() string {
return globalEndpoints.Localhost()
}
func (l *localLocker) canTakeLock(resources ...string) bool {
for _, resource := range resources {
_, lockTaken := l.lockMap[resource]
if lockTaken {
return false
}
}
return true
}
func (l *localLocker) Lock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
if len(args.Resources) > maxDeleteList {
return false, fmt.Errorf("internal error: localLocker.Lock called with more than %d resources", maxDeleteList)
}
l.mutex.Lock()
defer l.mutex.Unlock()
if !l.canTakeLock(args.Resources...) {
// Not all locks can be taken on resources,
// reject it completely.
return false, nil
}
// No locks held on the all resources, so claim write
// lock on all resources at once.
for i, resource := range args.Resources {
l.lockMap[resource] = []lockRequesterInfo{
{
Name: resource,
Writer: true,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Group: len(args.Resources) > 1,
Quorum: args.Quorum,
idx: i,
},
}
l.lockUID[formatUUID(args.UID, i)] = resource
}
return true, nil
}
func formatUUID(s string, idx int) string {
return s + strconv.Itoa(idx)
}
func (l *localLocker) Unlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
if len(args.Resources) > maxDeleteList {
return false, fmt.Errorf("internal error: localLocker.Unlock called with more than %d resources", maxDeleteList)
}
l.mutex.Lock()
defer l.mutex.Unlock()
err = nil
for _, resource := range args.Resources {
lri, ok := l.lockMap[resource]
if ok && !isWriteLock(lri) {
// Unless it is a write lock reject it.
err = fmt.Errorf("unlock attempted on a read locked entity: %s", resource)
continue
}
if ok {
reply = l.removeEntry(resource, args, &lri) || reply
}
}
return
}
// removeEntry based on the uid of the lock message, removes a single entry from the
// lockRequesterInfo array or the whole array from the map (in case of a write lock
// or last read lock)
// UID and optionally owner must match for entries to be deleted.
func (l *localLocker) removeEntry(name string, args dsync.LockArgs, lri *[]lockRequesterInfo) bool {
// Find correct entry to remove based on uid.
for index, entry := range *lri {
if entry.UID == args.UID && (args.Owner == "" || entry.Owner == args.Owner) {
if len(*lri) == 1 {
// Remove the write lock.
delete(l.lockMap, name)
} else {
// Remove the appropriate read lock.
*lri = append((*lri)[:index], (*lri)[index+1:]...)
l.lockMap[name] = *lri
}
delete(l.lockUID, formatUUID(args.UID, entry.idx))
return true
}
}
// None found return false, perhaps entry removed in previous run.
return false
}
func (l *localLocker) RLock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
perf: websocket grid connectivity for all internode communication (#18461) This PR adds a WebSocket grid feature that allows servers to communicate via a single two-way connection. There are two request types: * Single requests, which are `[]byte => ([]byte, error)`. This is for efficient small roundtrips with small payloads. * Streaming requests which are `[]byte, chan []byte => chan []byte (and error)`, which allows for different combinations of full two-way streams with an initial payload. Only a single stream is created between two machines - and there is, as such, no server/client relation since both sides can initiate and handle requests. Which server initiates the request is decided deterministically on the server names. Requests are made through a mux client and server, which handles message passing, congestion, cancelation, timeouts, etc. If a connection is lost, all requests are canceled, and the calling server will try to reconnect. Registered handlers can operate directly on byte slices or use a higher-level generics abstraction. There is no versioning of handlers/clients, and incompatible changes should be handled by adding new handlers. The request path can be changed to a new one for any protocol changes. First, all servers create a "Manager." The manager must know its address as well as all remote addresses. This will manage all connections. To get a connection to any remote, ask the manager to provide it given the remote address using. ``` func (m *Manager) Connection(host string) *Connection ``` All serverside handlers must also be registered on the manager. This will make sure that all incoming requests are served. The number of in-flight requests and responses must also be given for streaming requests. The "Connection" returned manages the mux-clients. Requests issued to the connection will be sent to the remote. * `func (c *Connection) Request(ctx context.Context, h HandlerID, req []byte) ([]byte, error)` performs a single request and returns the result. Any deadline provided on the request is forwarded to the server, and canceling the context will make the function return at once. * `func (c *Connection) NewStream(ctx context.Context, h HandlerID, payload []byte) (st *Stream, err error)` will initiate a remote call and send the initial payload. ```Go // A Stream is a two-way stream. // All responses *must* be read by the caller. // If the call is canceled through the context, //The appropriate error will be returned. type Stream struct { // Responses from the remote server. // Channel will be closed after an error or when the remote closes. // All responses *must* be read by the caller until either an error is returned or the channel is closed. // Canceling the context will cause the context cancellation error to be returned. Responses <-chan Response // Requests sent to the server. // If the handler is defined with 0 incoming capacity this will be nil. // Channel *must* be closed to signal the end of the stream. // If the request context is canceled, the stream will no longer process requests. Requests chan<- []byte } type Response struct { Msg []byte Err error } ``` There are generic versions of the server/client handlers that allow the use of type safe implementations for data types that support msgpack marshal/unmarshal.
2023-11-20 20:09:35 -05:00
if len(args.Resources) != 1 {
return false, fmt.Errorf("internal error: localLocker.RLock called with more than one resource")
}
l.mutex.Lock()
defer l.mutex.Unlock()
resource := args.Resources[0]
lrInfo := lockRequesterInfo{
Name: resource,
Writer: false,
Source: args.Source,
Owner: args.Owner,
UID: args.UID,
Timestamp: UTCNow(),
TimeLastRefresh: UTCNow(),
Quorum: args.Quorum,
}
if lri, ok := l.lockMap[resource]; ok {
if reply = !isWriteLock(lri); reply {
// Unless there is a write lock
l.lockMap[resource] = append(l.lockMap[resource], lrInfo)
l.lockUID[formatUUID(args.UID, 0)] = resource
}
} else {
// No locks held on the given name, so claim (first) read lock
l.lockMap[resource] = []lockRequesterInfo{lrInfo}
l.lockUID[formatUUID(args.UID, 0)] = resource
reply = true
}
return reply, nil
}
func (l *localLocker) RUnlock(_ context.Context, args dsync.LockArgs) (reply bool, err error) {
if len(args.Resources) > 1 {
return false, fmt.Errorf("internal error: localLocker.RUnlock called with more than one resource")
}
l.mutex.Lock()
defer l.mutex.Unlock()
var lri []lockRequesterInfo
resource := args.Resources[0]
if lri, reply = l.lockMap[resource]; !reply {
// No lock is held on the given name
return true, nil
}
if isWriteLock(lri) {
// A write-lock is held, cannot release a read lock
return false, fmt.Errorf("RUnlock attempted on a write locked entity: %s", resource)
}
l.removeEntry(resource, args, &lri)
return reply, nil
}
2023-04-04 00:23:24 -04:00
type lockStats struct {
Total int
Writes int
Reads int
}
func (l *localLocker) stats() lockStats {
l.mutex.Lock()
defer l.mutex.Unlock()
st := lockStats{Total: len(l.lockMap)}
for _, v := range l.lockMap {
if len(v) == 0 {
continue
}
entry := v[0]
if entry.Writer {
st.Writes++
} else {
st.Reads += len(v)
}
}
return st
}
type localLockMap map[string][]lockRequesterInfo
func (l *localLocker) DupLockMap() localLockMap {
l.mutex.Lock()
defer l.mutex.Unlock()
lockCopy := make(map[string][]lockRequesterInfo, len(l.lockMap))
for k, v := range l.lockMap {
2023-04-04 00:23:24 -04:00
if len(v) == 0 {
delete(l.lockMap, k)
continue
}
lockCopy[k] = append(make([]lockRequesterInfo, 0, len(v)), v...)
}
return lockCopy
}
func (l *localLocker) Close() error {
return nil
}
// IsOnline - local locker is always online.
2019-11-19 20:42:27 -05:00
func (l *localLocker) IsOnline() bool {
return true
}
// IsLocal - local locker returns true.
func (l *localLocker) IsLocal() bool {
return true
}
func (l *localLocker) ForceUnlock(ctx context.Context, args dsync.LockArgs) (reply bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock()
defer l.mutex.Unlock()
if len(args.UID) == 0 {
for _, resource := range args.Resources {
lris, ok := l.lockMap[resource]
if !ok {
continue
}
// Collect uids, so we don't mutate while we delete
uids := make([]string, 0, len(lris))
for _, lri := range lris {
uids = append(uids, lri.UID)
}
// Delete collected uids:
for _, uid := range uids {
lris, ok := l.lockMap[resource]
if !ok {
// Just to be safe, delete uuids.
for idx := 0; idx < maxDeleteList; idx++ {
mapID := formatUUID(uid, idx)
if _, ok := l.lockUID[mapID]; !ok {
break
}
delete(l.lockUID, mapID)
}
continue
}
l.removeEntry(resource, dsync.LockArgs{UID: uid}, &lris)
}
}
return true, nil
}
idx := 0
for {
mapID := formatUUID(args.UID, idx)
resource, ok := l.lockUID[mapID]
if !ok {
return idx > 0, nil
}
lris, ok := l.lockMap[resource]
if !ok {
// Unexpected inconsistency, delete.
delete(l.lockUID, mapID)
idx++
continue
}
reply = true
l.removeEntry(resource, dsync.LockArgs{UID: args.UID}, &lris)
idx++
}
}
}
func (l *localLocker) Refresh(ctx context.Context, args dsync.LockArgs) (refreshed bool, err error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
l.mutex.Lock()
defer l.mutex.Unlock()
// Check whether uid is still active.
resource, ok := l.lockUID[formatUUID(args.UID, 0)]
if !ok {
return false, nil
}
idx := 0
for {
lris, ok := l.lockMap[resource]
if !ok {
// Inconsistent. Delete UID.
delete(l.lockUID, formatUUID(args.UID, idx))
return idx > 0, nil
}
for i := range lris {
if lris[i].UID == args.UID {
lris[i].TimeLastRefresh = UTCNow()
}
}
idx++
resource, ok = l.lockUID[formatUUID(args.UID, idx)]
if !ok {
// No more resources for UID, but we did update at least one.
return true, nil
}
}
}
}
// Similar to removeEntry but only removes an entry only if the lock entry exists in map.
// Caller must hold 'l.mutex' lock.
func (l *localLocker) expireOldLocks(interval time.Duration) {
l.mutex.Lock()
defer l.mutex.Unlock()
Optimize read locker cleanup (#14200) When objects hold a lot of read locks cleanup time grows exponentially. ``` BEFORE: Unable to complete tests. AFTER: === RUN Test_localLocker_expireOldLocksExpire/100-locks/1-read local-locker_test.go:298: Scan Took: 0s. Left: 100/100 local-locker_test.go:317: Expire 50% took: 0s. Left: 44/44 local-locker_test.go:331: Expire rest took: 0s. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/100-locks/100-read local-locker_test.go:298: Scan Took: 0s. Left: 10000/100 local-locker_test.go:317: Expire 50% took: 1ms. Left: 5000/100 local-locker_test.go:331: Expire rest took: 1ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/100-locks/1000-read local-locker_test.go:298: Scan Took: 2ms. Left: 100000/100 local-locker_test.go:317: Expire 50% took: 55ms. Left: 50038/100 local-locker_test.go:331: Expire rest took: 29ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/1-read local-locker_test.go:298: Scan Took: 1ms. Left: 10000/10000 local-locker_test.go:317: Expire 50% took: 2ms. Left: 5019/5019 local-locker_test.go:331: Expire rest took: 2ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/100-read local-locker_test.go:298: Scan Took: 23ms. Left: 1000000/10000 local-locker_test.go:317: Expire 50% took: 160ms. Left: 499798/10000 local-locker_test.go:331: Expire rest took: 138ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/1000-read local-locker_test.go:298: Scan Took: 200ms. Left: 10000000/10000 local-locker_test.go:317: Expire 50% took: 5.888s. Left: 5000196/10000 local-locker_test.go:331: Expire rest took: 3.417s. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/1000000-locks/1-read local-locker_test.go:298: Scan Took: 133ms. Left: 1000000/1000000 local-locker_test.go:317: Expire 50% took: 348ms. Left: 500255/500255 local-locker_test.go:331: Expire rest took: 307ms. Left: 0/0 ```
2022-01-27 17:10:57 -05:00
for k, lris := range l.lockMap {
modified := false
for i := 0; i < len(lris); {
lri := &lris[i]
if time.Since(lri.TimeLastRefresh) > interval {
delete(l.lockUID, formatUUID(lri.UID, lri.idx))
if len(lris) == 1 {
// Remove the write lock.
delete(l.lockMap, lri.Name)
modified = false
break
}
Optimize read locker cleanup (#14200) When objects hold a lot of read locks cleanup time grows exponentially. ``` BEFORE: Unable to complete tests. AFTER: === RUN Test_localLocker_expireOldLocksExpire/100-locks/1-read local-locker_test.go:298: Scan Took: 0s. Left: 100/100 local-locker_test.go:317: Expire 50% took: 0s. Left: 44/44 local-locker_test.go:331: Expire rest took: 0s. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/100-locks/100-read local-locker_test.go:298: Scan Took: 0s. Left: 10000/100 local-locker_test.go:317: Expire 50% took: 1ms. Left: 5000/100 local-locker_test.go:331: Expire rest took: 1ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/100-locks/1000-read local-locker_test.go:298: Scan Took: 2ms. Left: 100000/100 local-locker_test.go:317: Expire 50% took: 55ms. Left: 50038/100 local-locker_test.go:331: Expire rest took: 29ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/1-read local-locker_test.go:298: Scan Took: 1ms. Left: 10000/10000 local-locker_test.go:317: Expire 50% took: 2ms. Left: 5019/5019 local-locker_test.go:331: Expire rest took: 2ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/100-read local-locker_test.go:298: Scan Took: 23ms. Left: 1000000/10000 local-locker_test.go:317: Expire 50% took: 160ms. Left: 499798/10000 local-locker_test.go:331: Expire rest took: 138ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/1000-read local-locker_test.go:298: Scan Took: 200ms. Left: 10000000/10000 local-locker_test.go:317: Expire 50% took: 5.888s. Left: 5000196/10000 local-locker_test.go:331: Expire rest took: 3.417s. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/1000000-locks/1-read local-locker_test.go:298: Scan Took: 133ms. Left: 1000000/1000000 local-locker_test.go:317: Expire 50% took: 348ms. Left: 500255/500255 local-locker_test.go:331: Expire rest took: 307ms. Left: 0/0 ```
2022-01-27 17:10:57 -05:00
modified = true
// Remove the appropriate lock.
lris = append(lris[:i], lris[i+1:]...)
// Check same i
} else {
// Move to next
i++
}
Optimize read locker cleanup (#14200) When objects hold a lot of read locks cleanup time grows exponentially. ``` BEFORE: Unable to complete tests. AFTER: === RUN Test_localLocker_expireOldLocksExpire/100-locks/1-read local-locker_test.go:298: Scan Took: 0s. Left: 100/100 local-locker_test.go:317: Expire 50% took: 0s. Left: 44/44 local-locker_test.go:331: Expire rest took: 0s. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/100-locks/100-read local-locker_test.go:298: Scan Took: 0s. Left: 10000/100 local-locker_test.go:317: Expire 50% took: 1ms. Left: 5000/100 local-locker_test.go:331: Expire rest took: 1ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/100-locks/1000-read local-locker_test.go:298: Scan Took: 2ms. Left: 100000/100 local-locker_test.go:317: Expire 50% took: 55ms. Left: 50038/100 local-locker_test.go:331: Expire rest took: 29ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/1-read local-locker_test.go:298: Scan Took: 1ms. Left: 10000/10000 local-locker_test.go:317: Expire 50% took: 2ms. Left: 5019/5019 local-locker_test.go:331: Expire rest took: 2ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/100-read local-locker_test.go:298: Scan Took: 23ms. Left: 1000000/10000 local-locker_test.go:317: Expire 50% took: 160ms. Left: 499798/10000 local-locker_test.go:331: Expire rest took: 138ms. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/10000-locks/1000-read local-locker_test.go:298: Scan Took: 200ms. Left: 10000000/10000 local-locker_test.go:317: Expire 50% took: 5.888s. Left: 5000196/10000 local-locker_test.go:331: Expire rest took: 3.417s. Left: 0/0 === RUN Test_localLocker_expireOldLocksExpire/1000000-locks/1-read local-locker_test.go:298: Scan Took: 133ms. Left: 1000000/1000000 local-locker_test.go:317: Expire 50% took: 348ms. Left: 500255/500255 local-locker_test.go:331: Expire rest took: 307ms. Left: 0/0 ```
2022-01-27 17:10:57 -05:00
}
if modified {
l.lockMap[k] = lris
}
}
}
func newLocker() *localLocker {
return &localLocker{
lockMap: make(map[string][]lockRequesterInfo, 1000),
lockUID: make(map[string]string, 1000),
}
}