diff --git a/packages/interface/src/content-routing/index.ts b/packages/interface/src/content-routing/index.ts index 81e452988d..ab6b69f392 100644 --- a/packages/interface/src/content-routing/index.ts +++ b/packages/interface/src/content-routing/index.ts @@ -1,4 +1,4 @@ -import type { AbortOptions, RoutingOptions } from '../index.js' +import type { RoutingOptions } from '../index.js' import type { PeerInfo } from '../peer-info/index.js' import type { CID } from 'multiformats/cid' @@ -50,7 +50,7 @@ export interface ContentRouting { * provide content corresponding to the passed CID, call this function to no * longer remind them. */ - cancelReprovide (key: CID, options?: AbortOptions): Promise + cancelReprovide (key: CID, options?: RoutingOptions): Promise /** * Find the providers of the passed CID. diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index 4961c77ada..0c039a58e9 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -753,13 +753,22 @@ export interface LoggerOptions { log: Logger } +/** + * An object that includes a context object that is passed onwards. + * + * This is used by metrics method tracing to link function calls together. + */ +export interface ContextOptions { + context?: any +} + /** * When a routing operation involves reading values, these options allow * controlling where the values are read from. By default libp2p will check * local caches but may not use the network if a valid local value is found, * these options allow tuning that behaviour. */ -export interface RoutingOptions extends AbortOptions, ProgressOptions { +export interface RoutingOptions extends AbortOptions, ProgressOptions, ContextOptions { /** * Pass `false` to not use the network * diff --git a/packages/interface/src/metrics/index.ts b/packages/interface/src/metrics/index.ts index 86284a359b..e6a3056504 100644 --- a/packages/interface/src/metrics/index.ts +++ b/packages/interface/src/metrics/index.ts @@ -488,4 +488,61 @@ export interface Metrics { * method on the returned summary group object */ registerSummaryGroup: ((name: string, options?: SummaryOptions) => SummaryGroup) & ((name: string, options: CalculatedSummaryOptions>) => void) + + /** + * Wrap a function for tracing purposes. + * + * All functions wrapped like this should accept a final optional options arg. + * + * In order to pass an execution context along to create a multi-layered + * trace, the index of the options arg must be specified. + */ + traceFunction AsyncIterator> (name: string, fn: F, options?: TraceGeneratorFunctionOptions, ReturnType, YieldType>>): F + traceFunction Iterator> (name: string, fn: F, options?: TraceGeneratorFunctionOptions, ReturnType, YieldType>>): F + traceFunction any = (...args: any[]) => any> (name: string, fn: F, options?: TraceFunctionOptions, ReturnType>): F + + /** + * Creates a tracing context that can be used to trace a method call + */ + createTraceContext(): any +} + +/** + * Infer the yielded type of an (async)iterable + */ +type YieldType | Iterator> = T extends AsyncIterator ? Y : T extends Iterator ? Y : never + +export type TraceAttributes = Record + +export interface TraceFunctionOptions { + /** + * To construct a trace that spans multiple method invocations, it's necessary + * to pass the trace context onwards as part of the options object. + * + * Specify the index of the options object in the args array here. + * + * @default 0 + */ + optionsIndex?: number + + /** + * Set attributes on the trace by modifying the passed attributes object. + */ + getAttributesFromArgs?(args: A, attributes: TraceAttributes): TraceAttributes + + /** + * Set attributes on the trace by modifying the passed attributes object. The + * object will have previously been passed to `appendAttributesFromArgs` + * and/or `appendAttributesFromYieldedValue` (if defined) + */ + getAttributesFromReturnValue?(value: B, attributes: TraceAttributes): TraceAttributes +} + +export interface TraceGeneratorFunctionOptions extends TraceFunctionOptions { + /** + * Set attributes on the trace by modifying the passed attributes object. The + * object will have previously been passed to `appendAttributesFromArgs` (if + * defined) + */ + getAttributesFromYieldedValue? (value: C, attributes: TraceAttributes, index: number): TraceAttributes } diff --git a/packages/kad-dht/src/content-fetching/index.ts b/packages/kad-dht/src/content-fetching/index.ts index 4e9d5cd0a7..4b01c4159c 100644 --- a/packages/kad-dht/src/content-fetching/index.ts +++ b/packages/kad-dht/src/content-fetching/index.ts @@ -55,6 +55,13 @@ export class ContentFetching { this.peerRouting = peerRouting this.queryManager = queryManager this.network = network + + this.get = components.metrics?.traceFunction('libp2p.kadDHT.get', this.get.bind(this), { + optionsIndex: 1 + }) ?? this.get + this.put = components.metrics?.traceFunction('libp2p.kadDHT.put', this.put.bind(this), { + optionsIndex: 2 + }) ?? this.put } /** @@ -145,7 +152,10 @@ export class ContentFetching { // put record to the closest peers yield * pipe( - this.peerRouting.getClosestPeers(key, { signal: options.signal }), + this.peerRouting.getClosestPeers(key, { + ...options, + signal: options.signal + }), (source) => map(source, (event) => { return async () => { if (event.name !== 'FINAL_PEER') { @@ -252,7 +262,10 @@ export class ContentFetching { const self = this // eslint-disable-line @typescript-eslint/no-this-alias const getValueQuery: QueryFunc = async function * ({ peer, signal }) { - for await (const event of self.peerRouting.getValueOrPeers(peer, key, { signal })) { + for await (const event of self.peerRouting.getValueOrPeers(peer, key, { + ...options, + signal + })) { yield event if (event.name === 'PEER_RESPONSE' && (event.record != null)) { diff --git a/packages/kad-dht/src/content-routing/index.ts b/packages/kad-dht/src/content-routing/index.ts index 3f65af20bf..932c348580 100644 --- a/packages/kad-dht/src/content-routing/index.ts +++ b/packages/kad-dht/src/content-routing/index.ts @@ -50,6 +50,29 @@ export class ContentRouting { this.queryManager = queryManager this.routingTable = routingTable this.providers = providers + + this.findProviders = components.metrics?.traceFunction('libp2p.kadDHT.findProviders', this.findProviders.bind(this), { + optionsIndex: 1, + getAttributesFromYieldedValue: (event, attrs: { providers?: string[] }) => { + if (event.name === 'PROVIDER') { + attrs.providers ??= [] + attrs.providers.push(...event.providers.map(info => info.id.toString())) + } + + return attrs + } + }) ?? this.findProviders + this.provide = components.metrics?.traceFunction('libp2p.kadDHT.provide', this.provide.bind(this), { + optionsIndex: 1, + getAttributesFromYieldedValue: (event, attrs: { providers?: string[] }) => { + if (event.name === 'PEER_RESPONSE' && event.messageName === 'ADD_PROVIDER') { + attrs.providers ??= [] + attrs.providers.push(event.from.toString()) + } + + return attrs + } + }) ?? this.provide } /** diff --git a/packages/kad-dht/src/network.ts b/packages/kad-dht/src/network.ts index 2273795be4..2d142b3cdc 100644 --- a/packages/kad-dht/src/network.ts +++ b/packages/kad-dht/src/network.ts @@ -57,6 +57,61 @@ export class Network extends TypedEventEmitter implements Startab operations: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_outbound_rpc_requests_total`), errors: components.metrics?.registerCounterGroup(`${init.metricsPrefix}_outbound_rpc_errors_total`) } + + this.sendRequest = components.metrics?.traceFunction('libp2p.kadDHT.sendRequest', this.sendRequest.bind(this), { + optionsIndex: 2, + getAttributesFromArgs ([to, message], attrs) { + return { + ...attrs, + to: to.toString(), + 'message type': `${message.type}` + } + }, + getAttributesFromYieldedValue: (event, attrs) => { + if (event.name === 'PEER_RESPONSE') { + if (event.providers.length > 0) { + event.providers.forEach((value, index) => { + attrs[`providers-${index}`] = value.id.toString() + }) + } + + if (event.closer.length > 0) { + event.closer.forEach((value, index) => { + attrs[`closer-${index}`] = value.id.toString() + }) + } + } + + return attrs + } + }) ?? this.sendRequest + this.sendMessage = components.metrics?.traceFunction('libp2p.kadDHT.sendMessage', this.sendMessage.bind(this), { + optionsIndex: 2, + getAttributesFromArgs ([to, message], attrs) { + return { + ...attrs, + to: to.toString(), + 'message type': `${message.type}` + } + }, + getAttributesFromYieldedValue: (event, attrs) => { + if (event.name === 'PEER_RESPONSE') { + if (event.providers.length > 0) { + event.providers.forEach((value, index) => { + attrs[`providers-${index}`] = value.id.toString() + }) + } + + if (event.closer.length > 0) { + event.closer.forEach((value, index) => { + attrs[`closer-${index}`] = value.id.toString() + }) + } + } + + return attrs + } + }) ?? this.sendMessage } /** diff --git a/packages/kad-dht/src/peer-routing/index.ts b/packages/kad-dht/src/peer-routing/index.ts index 56ff87918c..0e27a4188b 100644 --- a/packages/kad-dht/src/peer-routing/index.ts +++ b/packages/kad-dht/src/peer-routing/index.ts @@ -22,12 +22,13 @@ import type { Network } from '../network.js' import type { QueryManager, QueryOptions } from '../query/manager.js' import type { QueryFunc } from '../query/types.js' import type { RoutingTable } from '../routing-table/index.js' -import type { ComponentLogger, Logger, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface' +import type { ComponentLogger, Logger, Metrics, PeerId, PeerInfo, PeerStore, RoutingOptions } from '@libp2p/interface' export interface PeerRoutingComponents { peerId: PeerId peerStore: PeerStore logger: ComponentLogger + metrics?: Metrics } export interface PeerRoutingInit { @@ -55,6 +56,13 @@ export class PeerRouting { this.peerStore = components.peerStore this.peerId = components.peerId this.log = components.logger.forComponent(`${init.logPrefix}:peer-routing`) + + this.findPeer = components.metrics?.traceFunction('libp2p.kadDHT.findPeer', this.findPeer.bind(this), { + optionsIndex: 1 + }) ?? this.findPeer + this.getClosestPeers = components.metrics?.traceFunction('libp2p.kadDHT.getClosestPeers', this.getClosestPeers.bind(this), { + optionsIndex: 1 + }) ?? this.getClosestPeers } /** diff --git a/packages/kad-dht/src/query-self.ts b/packages/kad-dht/src/query-self.ts index 966a544d02..979cf5b3a5 100644 --- a/packages/kad-dht/src/query-self.ts +++ b/packages/kad-dht/src/query-self.ts @@ -10,9 +10,8 @@ import { timeOperationMethod } from './utils.js' import type { OperationMetrics } from './kad-dht.js' import type { PeerRouting } from './peer-routing/index.js' import type { RoutingTable } from './routing-table/index.js' -import type { ComponentLogger, Logger, PeerId, Startable } from '@libp2p/interface' +import type { ComponentLogger, Logger, Metrics, PeerId, Startable } from '@libp2p/interface' import type { DeferredPromise } from 'p-defer' - export interface QuerySelfInit { logPrefix: string peerRouting: PeerRouting @@ -28,6 +27,7 @@ export interface QuerySelfInit { export interface QuerySelfComponents { peerId: PeerId logger: ComponentLogger + metrics?: Metrics } /** diff --git a/packages/kad-dht/src/query/manager.ts b/packages/kad-dht/src/query/manager.ts index e1e3b75d48..dacb11fa1b 100644 --- a/packages/kad-dht/src/query/manager.ts +++ b/packages/kad-dht/src/query/manager.ts @@ -171,6 +171,7 @@ export class QueryManager implements Startable { // Create query paths from the starting peers const paths = peersToQuery.map((peer, index) => { return queryPath({ + ...options, key, startingPeer: peer, ourPeerId: this.peerId, diff --git a/packages/kad-dht/src/query/query-path.ts b/packages/kad-dht/src/query/query-path.ts index a3b09d57a6..34ab9ba13c 100644 --- a/packages/kad-dht/src/query/query-path.ts +++ b/packages/kad-dht/src/query/query-path.ts @@ -121,6 +121,7 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator { + return { + ...attrs, + cid: cid.toString() + } + }, + getAttributesFromYieldedValue: (value, attrs: { providers?: string[] }) => { + return { + ...attrs, + providers: [...(Array.isArray(attrs.providers) ? attrs.providers : []), value.id.toString()] + } + } + }) ?? this.findProviders + this.provide = components.metrics?.traceFunction('libp2p.contentRouting.provide', this.provide.bind(this), { + optionsIndex: 1, + getAttributesFromArgs: ([cid], attrs) => { + return { + ...attrs, + cid: cid.toString() + } + } + }) ?? this.provide + this.cancelReprovide = components.metrics?.traceFunction('libp2p.contentRouting.cancelReprovide', this.cancelReprovide.bind(this), { + optionsIndex: 1, + getAttributesFromArgs: ([cid], attrs) => { + return { + ...attrs, + cid: cid.toString() + } + } + }) ?? this.cancelReprovide + this.put = components.metrics?.traceFunction('libp2p.contentRouting.put', this.put.bind(this), { + optionsIndex: 2, + getAttributesFromArgs: ([key]) => { + return { + key: uint8ArrayToString(key, 'base36') + } + } + }) ?? this.put + this.get = components.metrics?.traceFunction('libp2p.contentRouting.get', this.get.bind(this), { + optionsIndex: 1, + getAttributesFromArgs: ([key]) => { + return { + key: uint8ArrayToString(key, 'base36') + } + } + }) ?? this.get } readonly [Symbol.toStringTag] = '@libp2p/content-routing' @@ -43,7 +95,7 @@ export class CompoundContentRouting implements ContentRouting, Startable { /** * Iterates over all content routers in parallel to find providers of the given key */ - async * findProviders (key: CID, options: RoutingOptions = {}): AsyncIterable { + async * findProviders (key: CID, options: RoutingOptions = {}): AsyncGenerator { if (this.routers.length === 0) { throw new NoContentRoutersError('No content routers available') } diff --git a/packages/libp2p/src/peer-routing.ts b/packages/libp2p/src/peer-routing.ts index f3aefc5b0b..dd9330c474 100644 --- a/packages/libp2p/src/peer-routing.ts +++ b/packages/libp2p/src/peer-routing.ts @@ -2,8 +2,9 @@ import { NotFoundError } from '@libp2p/interface' import { createScalableCuckooFilter } from '@libp2p/utils/filters' import merge from 'it-merge' import parallel from 'it-parallel' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { NoPeerRoutersError, QueriedForSelfError } from './errors.js' -import type { Logger, PeerId, PeerInfo, PeerRouting, PeerStore, RoutingOptions } from '@libp2p/interface' +import type { Logger, Metrics, PeerId, PeerInfo, PeerRouting, PeerStore, RoutingOptions } from '@libp2p/interface' import type { ComponentLogger } from '@libp2p/logger' export interface PeerRoutingInit { @@ -14,6 +15,7 @@ export interface DefaultPeerRoutingComponents { peerId: PeerId peerStore: PeerStore logger: ComponentLogger + metrics?: Metrics } export class DefaultPeerRouting implements PeerRouting { @@ -27,6 +29,31 @@ export class DefaultPeerRouting implements PeerRouting { this.peerId = components.peerId this.peerStore = components.peerStore this.routers = init.routers ?? [] + + this.findPeer = components.metrics?.traceFunction('libp2p.peerRouting.findPeer', this.findPeer.bind(this), { + optionsIndex: 1, + getAttributesFromArgs: ([peer], attrs) => { + return { + ...attrs, + peer: peer.toString() + } + } + }) ?? this.findPeer + this.getClosestPeers = components.metrics?.traceFunction('libp2p.peerRouting.getClosestPeers', this.getClosestPeers.bind(this), { + optionsIndex: 1, + getAttributesFromArgs: ([key], attrs) => { + return { + ...attrs, + key: uint8ArrayToString(key, 'base36') + } + }, + getAttributesFromYieldedValue: (value, attrs: { peers?: string[] }) => { + return { + ...attrs, + peers: [...(Array.isArray(attrs.peers) ? attrs.peers : []), value.id.toString()] + } + } + }) ?? this.getClosestPeers } readonly [Symbol.toStringTag] = '@libp2p/peer-routing' @@ -75,7 +102,7 @@ export class DefaultPeerRouting implements PeerRouting { /** * Attempt to find the closest peers on the network to the given key */ - async * getClosestPeers (key: Uint8Array, options: RoutingOptions = {}): AsyncIterable { + async * getClosestPeers (key: Uint8Array, options: RoutingOptions = {}): AsyncGenerator { if (this.routers.length === 0) { throw new NoPeerRoutersError('No peer routers available') } diff --git a/packages/metrics-devtools/src/index.ts b/packages/metrics-devtools/src/index.ts index 8d3732d4cf..b30d82c7f3 100644 --- a/packages/metrics-devtools/src/index.ts +++ b/packages/metrics-devtools/src/index.ts @@ -239,6 +239,14 @@ class DevToolsMetrics implements Metrics, Startable { return this.simpleMetrics.registerSummaryGroup(name, options) } + createTraceContext (): any { + return this.simpleMetrics.createTraceContext() + } + + traceFunction any> (name: string, fn: T, options?: any): T { + return this.simpleMetrics.traceFunction(name, fn, options) + } + async start (): Promise { // send peer updates this.components.events.addEventListener('peer:connect', this.onPeersUpdate) diff --git a/packages/metrics-prometheus/src/index.ts b/packages/metrics-prometheus/src/index.ts index dff51ecfb2..3686ed74c2 100644 --- a/packages/metrics-prometheus/src/index.ts +++ b/packages/metrics-prometheus/src/index.ts @@ -490,6 +490,15 @@ class PrometheusMetrics implements Metrics { return metricGroup } } + + createTraceContext (): any { + // no-op + } + + traceFunction any> (name: string, fn: T): T { + // no-op + return fn + } } export function prometheusMetrics (init?: Partial): (components: PrometheusMetricsComponents) => Metrics { diff --git a/packages/metrics-simple/package.json b/packages/metrics-simple/package.json index 16c290a44c..f1516f7665 100644 --- a/packages/metrics-simple/package.json +++ b/packages/metrics-simple/package.json @@ -59,5 +59,6 @@ "@types/tdigest": "^0.1.4", "aegir": "^45.0.5", "p-defer": "^4.0.1" - } + }, + "sideEffects": false } diff --git a/packages/metrics-simple/src/index.ts b/packages/metrics-simple/src/index.ts index 8063efec78..f98ea421bd 100644 --- a/packages/metrics-simple/src/index.ts +++ b/packages/metrics-simple/src/index.ts @@ -551,6 +551,15 @@ class SimpleMetrics implements Metrics, Startable { return metric } + + createTraceContext (): any { + // no-op + } + + traceFunction any> (name: string, fn: T): T { + // no-op + return fn + } } export function simpleMetrics (init: SimpleMetricsInit): (components: unknown) => Metrics {