-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlocker.go
63 lines (50 loc) · 1.38 KB
/
locker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package pgutil
import (
"context"
"errors"
"math"
"github.com/segmentio/fasthash/fnv1"
)
type TransactionalLocker struct {
db DB
namespace int32
}
var ErrInTransaction = errors.New("locker database must not be in transaction")
func StringKey(key string) int32 {
return int32(fnv1.HashString32(key) % math.MaxInt32)
}
func NewTransactionalLocker(db DB, namespace int32) (*TransactionalLocker, error) {
if db.IsInTransaction() {
return nil, ErrInTransaction
}
locker := &TransactionalLocker{
db: db,
namespace: namespace,
}
return locker, nil
}
func (l *TransactionalLocker) WithLock(ctx context.Context, key int32, f func(tx DB) error) error {
return l.db.WithTransaction(ctx, func(tx DB) error {
if err := tx.Exec(ctx, Query("SELECT pg_advisory_xact_lock({:namespace}, {:key})", Args{
"namespace": l.namespace,
"key": key,
})); err != nil {
return err
}
return f(tx)
})
}
func (l *TransactionalLocker) TryWithLock(ctx context.Context, key int32, f func(tx DB) error) (acquired bool, _ error) {
err := l.db.WithTransaction(ctx, func(tx DB) (err error) {
if acquired, _, err = ScanBool(tx.Query(ctx, Query("SELECT pg_try_advisory_xact_lock({:namespace}, {:key})", Args{
"namespace": l.namespace,
"key": key,
}))); err != nil {
return err
} else if !acquired {
return nil
}
return f(tx)
})
return acquired, err
}