From 80ffc5d5b6ef30a4efc7cfa90ec24e3a1e88337d Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 5 Oct 2020 14:22:15 +0100 Subject: [PATCH 1/4] Parallelise checkUsingKeys --- keyring.go | 91 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 61 insertions(+), 30 deletions(-) diff --git a/keyring.go b/keyring.go index 605c176c..a5ed7bb9 100644 --- a/keyring.go +++ b/keyring.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "runtime" "strings" "sync" "time" @@ -325,39 +326,69 @@ func (k *KeyRing) checkUsingKeys( requests []VerifyJSONRequest, results []VerifyJSONResult, keyIDs [][]KeyID, keys map[PublicKeyLookupRequest]PublicKeyLookupResult, ) { + procs := runtime.NumCPU() - 1 + if procs < 1 { + procs = 1 + } + jobs := make(map[int][]VerifyJSONRequest) for i := range requests { - if results[i].Error == nil { - // We've already checked this message and it passed the signature checks. - // So we can skip to the next message. - continue - } - for _, keyID := range keyIDs[i] { - serverKey, ok := keys[PublicKeyLookupRequest{requests[i].ServerName, keyID}] - if !ok { - // No key for this key ID so we continue onto the next key ID. - continue - } - if !serverKey.WasValidAt(requests[i].AtTS, requests[i].StrictValidityChecking) { - // The key wasn't valid at the timestamp we needed it to be valid at. - // So skip onto the next key. - results[i].Error = fmt.Errorf( - "gomatrixserverlib: key with ID %q for %q not valid at %d", - keyID, requests[i].ServerName, requests[i].AtTS, - ) - continue - } - if err := VerifyJSON( - string(requests[i].ServerName), keyID, ed25519.PublicKey(serverKey.Key), requests[i].Message, - ); err != nil { - // The signature wasn't valid, record the error and try the next key ID. - results[i].Error = err - continue + jobs[i%procs] = append(jobs[i%procs], requests[i]) + } + fmt.Println("Balancing jobs across", procs, "queues") + for i, r := range jobs { + fmt.Println("*", i, "has", len(r), "jobs") + } + var wg sync.WaitGroup + var mu sync.RWMutex + wg.Add(len(jobs)) + for _, r := range jobs { + go func(requests []VerifyJSONRequest) { + for i := range requests { + mu.RLock() + if results[i].Error == nil { + // We've already checked this message and it passed the signature checks. + // So we can skip to the next message. + mu.RUnlock() + continue + } + mu.RUnlock() + for _, keyID := range keyIDs[i] { + serverKey, ok := keys[PublicKeyLookupRequest{requests[i].ServerName, keyID}] + if !ok { + // No key for this key ID so we continue onto the next key ID. + continue + } + if !serverKey.WasValidAt(requests[i].AtTS, requests[i].StrictValidityChecking) { + // The key wasn't valid at the timestamp we needed it to be valid at. + // So skip onto the next key. + mu.Lock() + results[i].Error = fmt.Errorf( + "gomatrixserverlib: key with ID %q for %q not valid at %d", + keyID, requests[i].ServerName, requests[i].AtTS, + ) + mu.Unlock() + continue + } + if err := VerifyJSON( + string(requests[i].ServerName), keyID, ed25519.PublicKey(serverKey.Key), requests[i].Message, + ); err != nil { + // The signature wasn't valid, record the error and try the next key ID. + mu.Lock() + results[i].Error = err + mu.Unlock() + continue + } + // The signature is valid, set the result to nil. + mu.Lock() + results[i].Error = nil + mu.Unlock() + break + } } - // The signature is valid, set the result to nil. - results[i].Error = nil - break - } + wg.Done() + }(r) } + wg.Wait() } type KeyClient interface { From 3824fe28dfa988919862926abe3699891d57d811 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 5 Oct 2020 14:31:25 +0100 Subject: [PATCH 2/4] Preserve original indexes --- keyring.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/keyring.go b/keyring.go index a5ed7bb9..a35f597c 100644 --- a/keyring.go +++ b/keyring.go @@ -330,9 +330,13 @@ func (k *KeyRing) checkUsingKeys( if procs < 1 { procs = 1 } - jobs := make(map[int][]VerifyJSONRequest) + type job struct { + index int + request VerifyJSONRequest + } + jobs := make(map[int][]job) for i := range requests { - jobs[i%procs] = append(jobs[i%procs], requests[i]) + jobs[i%procs] = append(jobs[i%procs], job{i, requests[i]}) } fmt.Println("Balancing jobs across", procs, "queues") for i, r := range jobs { @@ -341,52 +345,52 @@ func (k *KeyRing) checkUsingKeys( var wg sync.WaitGroup var mu sync.RWMutex wg.Add(len(jobs)) - for _, r := range jobs { - go func(requests []VerifyJSONRequest) { - for i := range requests { + for _, j := range jobs { + go func(jobs []job) { + for i, j := range jobs { mu.RLock() - if results[i].Error == nil { + if results[j.index].Error == nil { // We've already checked this message and it passed the signature checks. // So we can skip to the next message. mu.RUnlock() continue } mu.RUnlock() - for _, keyID := range keyIDs[i] { - serverKey, ok := keys[PublicKeyLookupRequest{requests[i].ServerName, keyID}] + for _, keyID := range keyIDs[j.index] { + serverKey, ok := keys[PublicKeyLookupRequest{j.request.ServerName, keyID}] if !ok { // No key for this key ID so we continue onto the next key ID. continue } - if !serverKey.WasValidAt(requests[i].AtTS, requests[i].StrictValidityChecking) { + if !serverKey.WasValidAt(j.request.AtTS, j.request.StrictValidityChecking) { // The key wasn't valid at the timestamp we needed it to be valid at. // So skip onto the next key. mu.Lock() - results[i].Error = fmt.Errorf( + results[j.index].Error = fmt.Errorf( "gomatrixserverlib: key with ID %q for %q not valid at %d", - keyID, requests[i].ServerName, requests[i].AtTS, + keyID, j.request.ServerName, j.request.AtTS, ) mu.Unlock() continue } if err := VerifyJSON( - string(requests[i].ServerName), keyID, ed25519.PublicKey(serverKey.Key), requests[i].Message, + string(j.request.ServerName), keyID, ed25519.PublicKey(serverKey.Key), j.request.Message, ); err != nil { // The signature wasn't valid, record the error and try the next key ID. mu.Lock() - results[i].Error = err + results[j.index].Error = err mu.Unlock() continue } // The signature is valid, set the result to nil. mu.Lock() - results[i].Error = nil + results[j.index].Error = nil mu.Unlock() break } } wg.Done() - }(r) + }(j) } wg.Wait() } From 1d81ae0b5fc5db94e064ddfae0b1b9d834ba5e19 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 5 Oct 2020 14:32:05 +0100 Subject: [PATCH 3/4] Unused var --- keyring.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keyring.go b/keyring.go index a35f597c..974de9e2 100644 --- a/keyring.go +++ b/keyring.go @@ -347,7 +347,7 @@ func (k *KeyRing) checkUsingKeys( wg.Add(len(jobs)) for _, j := range jobs { go func(jobs []job) { - for i, j := range jobs { + for _, j := range jobs { mu.RLock() if results[j.index].Error == nil { // We've already checked this message and it passed the signature checks. From 3a984d70ba40734a94c065660c09043f201acc85 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 5 Oct 2020 14:40:15 +0100 Subject: [PATCH 4/4] Remove output logging --- keyring.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/keyring.go b/keyring.go index 974de9e2..8484ea99 100644 --- a/keyring.go +++ b/keyring.go @@ -331,19 +331,15 @@ func (k *KeyRing) checkUsingKeys( procs = 1 } type job struct { - index int - request VerifyJSONRequest + index int // the original index in the requests/results array + request VerifyJSONRequest // the request itself } jobs := make(map[int][]job) for i := range requests { jobs[i%procs] = append(jobs[i%procs], job{i, requests[i]}) } - fmt.Println("Balancing jobs across", procs, "queues") - for i, r := range jobs { - fmt.Println("*", i, "has", len(r), "jobs") - } - var wg sync.WaitGroup - var mu sync.RWMutex + var wg sync.WaitGroup // tracks the workers + var mu sync.RWMutex // protects results array wg.Add(len(jobs)) for _, j := range jobs { go func(jobs []job) {