From 2457c76015c627f2cd716645ab22cbcdbbf31d0b Mon Sep 17 00:00:00 2001 From: Tobias Bachert Date: Wed, 24 Aug 2022 14:52:04 +0200 Subject: [PATCH] Prevent concurrent `::export()` calls in span processor (#788) * Prevent concurrent `::export()` calls in batch span processor * Prevent concurrent `::export()` calls in simple span processor * Allow disabling auto-flush in batch processor `::onEnd()` * Include in-flight batches in queue limit * Handle exporter exceptions to prevent termination of worker * Use `LogsMessagesTrait` --- .../SpanProcessor/BatchSpanProcessor.php | 223 ++++++++++------ .../SpanProcessor/SimpleSpanProcessor.php | 87 +++++-- src/SDK/Trace/SpanProcessorFactory.php | 14 +- tests/Benchmark/OtlpBench.php | 3 +- .../SpanProcessor/BatchSpanProcessorTest.php | 240 +++++++++++++----- .../SpanProcessor/SimpleSpanProcessorTest.php | 81 +++++- .../SDK/Trace/SpanProcessorFactoryTest.php | 2 +- 7 files changed, 488 insertions(+), 162 deletions(-) diff --git a/src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php b/src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php index 9d794032a..e7837fca0 100644 --- a/src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php +++ b/src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php @@ -4,142 +4,213 @@ namespace OpenTelemetry\SDK\Trace\SpanProcessor; +use function assert; +use function count; use InvalidArgumentException; use OpenTelemetry\Context\Context; -use OpenTelemetry\SDK\Common\Environment\EnvironmentVariablesTrait; -use OpenTelemetry\SDK\Common\Environment\Variables as Env; +use OpenTelemetry\SDK\Behavior\LogsMessagesTrait; use OpenTelemetry\SDK\Common\Future\CancellationInterface; -use OpenTelemetry\SDK\Common\Time\ClockFactory; use OpenTelemetry\SDK\Common\Time\ClockInterface; -use OpenTelemetry\SDK\Common\Time\StopWatch; -use OpenTelemetry\SDK\Common\Time\StopWatchFactory; -use OpenTelemetry\SDK\Common\Time\Util as TimeUtil; use OpenTelemetry\SDK\Trace\ReadableSpanInterface; use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface; use OpenTelemetry\SDK\Trace\SpanDataInterface; use OpenTelemetry\SDK\Trace\SpanExporterInterface; use OpenTelemetry\SDK\Trace\SpanProcessorInterface; +use SplQueue; +use function sprintf; +use Throwable; class BatchSpanProcessor implements SpanProcessorInterface { - use EnvironmentVariablesTrait; + use LogsMessagesTrait; public const DEFAULT_SCHEDULE_DELAY = 5000; public const DEFAULT_EXPORT_TIMEOUT = 30000; public const DEFAULT_MAX_QUEUE_SIZE = 2048; public const DEFAULT_MAX_EXPORT_BATCH_SIZE = 512; - private ?SpanExporterInterface $exporter; - private ?int $maxQueueSize; - private ?int $scheduledDelayMillis; - // @todo: Please, check if this code is needed. It creates an error in phpstan, since it's not used - /** @phpstan-ignore-next-line */ - private ?int $exporterTimeoutMillis; - private ?int $maxExportBatchSize; - private bool $running = true; - private StopWatch $stopwatch; - + private SpanExporterInterface $exporter; + private ClockInterface $clock; + private int $maxQueueSize; + private int $scheduledDelayNanos; + private int $maxExportBatchSize; + private bool $autoFlush; + + private ?int $nextScheduledRun = null; + private bool $running = false; + private int $batchId = 0; + private int $queueSize = 0; /** @var list */ - private array $queue = []; + private array $batch = []; + /** @var SplQueue> */ + private SplQueue $queue; + /** @var SplQueue */ + private SplQueue $flush; + + private bool $closed = false; public function __construct( - ?SpanExporterInterface $exporter, - ClockInterface $clock = null, - int $maxQueueSize = null, - int $scheduledDelayMillis = null, - int $exporterTimeoutMillis = null, - int $maxExportBatchSize = null + SpanExporterInterface $exporter, + ClockInterface $clock, + int $maxQueueSize = self::DEFAULT_MAX_QUEUE_SIZE, + int $scheduledDelayMillis = self::DEFAULT_SCHEDULE_DELAY, + int $exportTimeoutMillis = self::DEFAULT_EXPORT_TIMEOUT, + int $maxExportBatchSize = self::DEFAULT_MAX_EXPORT_BATCH_SIZE, + bool $autoFlush = true ) { - $this->exporter = $exporter; - // @todo make the stopwatch a dependency rather than using the factory? - $this->stopwatch = StopWatchFactory::create($clock ?? ClockFactory::getDefault())->build(); - $this->stopwatch->start(); - $this->maxQueueSize = $maxQueueSize - ?: $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_QUEUE_SIZE, self::DEFAULT_MAX_QUEUE_SIZE); - $this->scheduledDelayMillis = $scheduledDelayMillis - ?: $this->getIntFromEnvironment(Env::OTEL_BSP_SCHEDULE_DELAY, self::DEFAULT_SCHEDULE_DELAY); - $this->exporterTimeoutMillis = $exporterTimeoutMillis - ?: $this->getIntFromEnvironment(Env::OTEL_BSP_EXPORT_TIMEOUT, self::DEFAULT_EXPORT_TIMEOUT); - $this->maxExportBatchSize = $maxExportBatchSize - ?: $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_EXPORT_BATCH_SIZE, self::DEFAULT_MAX_EXPORT_BATCH_SIZE); - if ($this->maxExportBatchSize > $this->maxQueueSize) { - throw new InvalidArgumentException( - sprintf('maxExportBatchSize should be smaller or equal to %s', $this->maxQueueSize) - ); + if ($maxQueueSize <= 0) { + throw new InvalidArgumentException(sprintf('Maximum queue size (%d) must be greater than zero', $maxQueueSize)); + } + if ($scheduledDelayMillis <= 0) { + throw new InvalidArgumentException(sprintf('Scheduled delay (%d) must be greater than zero', $scheduledDelayMillis)); + } + if ($exportTimeoutMillis <= 0) { + throw new InvalidArgumentException(sprintf('Export timeout (%d) must be greater than zero', $exportTimeoutMillis)); + } + if ($maxExportBatchSize <= 0) { + throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be greater than zero', $maxExportBatchSize)); } + if ($maxExportBatchSize > $maxQueueSize) { + throw new InvalidArgumentException(sprintf('Maximum export batch size (%d) must be less than or equal to maximum queue size (%d)', $maxExportBatchSize, $maxQueueSize)); + } + + $this->exporter = $exporter; + $this->clock = $clock; + $this->maxQueueSize = $maxQueueSize; + $this->scheduledDelayNanos = $scheduledDelayMillis * 1_000_000; + $this->maxExportBatchSize = $maxExportBatchSize; + $this->autoFlush = $autoFlush; + + $this->queue = new SplQueue(); + $this->flush = new SplQueue(); } - /** - * @inheritDoc - */ public function onStart(ReadWriteSpanInterface $span, Context $parentContext): void { } - /** - * @inheritDoc - */ public function onEnd(ReadableSpanInterface $span): void { - if (null === $this->exporter) { + if ($this->closed) { return; } - - if (!$this->running) { + if (!$span->getContext()->isSampled()) { return; } - if ($span->getContext()->isSampled() && !$this->queueReachedLimit()) { - $this->queue[] = $span->toSpanData(); + if ($this->queueSize === $this->maxQueueSize) { + return; } - if ($this->bufferReachedExportLimit() || $this->enoughTimeHasPassed()) { - $this->forceFlush(); + $this->queueSize++; + $this->batch[] = $span->toSpanData(); + $this->nextScheduledRun ??= $this->clock->now() + $this->scheduledDelayNanos; + + if (count($this->batch) === $this->maxExportBatchSize) { + $this->enqueueBatch(); + } + if ($this->autoFlush) { + $this->flush(); } } - /** @inheritDoc */ public function forceFlush(?CancellationInterface $cancellation = null): bool { - if (!$this->running || $this->exporter === null) { - return true; + if ($this->closed) { + return false; } - $this->exporter->export($this->queue)->await(); - $this->queue = []; - $this->stopwatch->reset(); - $this->exporter->forceFlush(); - - return true; + return $this->flush(__FUNCTION__, $cancellation); } - /** @inheritDoc */ public function shutdown(?CancellationInterface $cancellation = null): bool { - if (!$this->running) { - return true; + if ($this->closed) { + return false; } - if (null !== $this->exporter && $this->forceFlush()) { - $this->exporter->shutdown(); - } - $this->running = false; + $this->closed = true; - return true; + return $this->flush(__FUNCTION__, $cancellation); } - protected function bufferReachedExportLimit(): bool + private function flush(?string $flushMethod = null, ?CancellationInterface $cancellation = null): bool { - return count($this->queue) >= $this->maxExportBatchSize; + if ($flushMethod !== null) { + $flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch; + $this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running]); + } + + if ($this->running) { + return false; + } + + $success = true; + $exception = null; + $this->running = true; + + try { + for (;;) { + while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) { + [, $flushMethod, $cancellation, $propagateResult] = $this->flush->dequeue(); + + try { + $result = $this->exporter->$flushMethod($cancellation); + if ($propagateResult) { + $success = $result; + } + } catch (Throwable $e) { + if ($propagateResult) { + $exception = $e; + + continue; + } + self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]); + } + } + + if (!$this->shouldFlush()) { + break; + } + + if ($this->queue->isEmpty()) { + $this->enqueueBatch(); + } + $batchSize = count($this->queue->bottom()); + $this->batchId++; + + try { + $this->exporter->export($this->queue->dequeue())->await(); + } catch (Throwable $e) { + self::logError('Unhandled export error', ['exception' => $e]); + } finally { + $this->queueSize -= $batchSize; + } + } + } finally { + $this->running = false; + } + + if ($exception !== null) { + throw $exception; + } + + return $success; } - protected function queueReachedLimit(): bool + private function shouldFlush(): bool { - return count($this->queue) >= $this->maxQueueSize; + return !$this->flush->isEmpty() + || $this->autoFlush && !$this->queue->isEmpty() + || $this->autoFlush && $this->nextScheduledRun !== null && $this->clock->now() > $this->nextScheduledRun; } - protected function enoughTimeHasPassed(): bool + private function enqueueBatch(): void { - return TimeUtil::millisToNanos((int) $this->scheduledDelayMillis) < $this->stopwatch->getLastElapsedTime(); + assert($this->batch !== []); + + $this->queue->enqueue($this->batch); + $this->batch = []; + $this->nextScheduledRun = null; } } diff --git a/src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php b/src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php index c8d7cc610..c4c7003f6 100644 --- a/src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php +++ b/src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php @@ -4,6 +4,7 @@ namespace OpenTelemetry\SDK\Trace\SpanProcessor; +use Closure; use OpenTelemetry\Context\Context; use OpenTelemetry\SDK\Behavior\LogsMessagesTrait; use OpenTelemetry\SDK\Common\Future\CancellationInterface; @@ -11,56 +12,104 @@ use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface; use OpenTelemetry\SDK\Trace\SpanExporterInterface; use OpenTelemetry\SDK\Trace\SpanProcessorInterface; +use SplQueue; +use function sprintf; +use Throwable; class SimpleSpanProcessor implements SpanProcessorInterface { use LogsMessagesTrait; - private ?SpanExporterInterface $exporter; - private bool $running = true; + private SpanExporterInterface $exporter; - public function __construct(SpanExporterInterface $exporter = null) + private bool $running = false; + /** @var SplQueue */ + private SplQueue $queue; + + private bool $closed = false; + + public function __construct(SpanExporterInterface $exporter) { $this->exporter = $exporter; + + $this->queue = new SplQueue(); } - /** @inheritDoc */ public function onStart(ReadWriteSpanInterface $span, Context $parentContext): void { } - /** @inheritDoc */ public function onEnd(ReadableSpanInterface $span): void { - if (!$this->running || !$span->getContext()->isSampled()) { + if ($this->closed) { return; } - - if (null !== $this->exporter) { - $this->exporter->export([$span->toSpanData()])->await(); + if (!$span->getContext()->isSampled()) { + return; } + + $spanData = $span->toSpanData(); + $this->flush(fn () => $this->exporter->export([$spanData])->await(), 'export'); } - /** @inheritDoc */ public function forceFlush(?CancellationInterface $cancellation = null): bool { - return true; + if ($this->closed) { + return false; + } + + return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation), __FUNCTION__, true); } - /** @inheritDoc */ public function shutdown(?CancellationInterface $cancellation = null): bool { - if (!$this->running) { - return true; + if ($this->closed) { + return false; } - $this->running = false; - self::logDebug('Shutting down span processor'); + $this->closed = true; + + return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation), __FUNCTION__, true); + } + + private function flush(Closure $task, string $taskName, bool $propagateResult = false): bool + { + $this->queue->enqueue([$task, $taskName, $propagateResult && !$this->running]); + + if ($this->running) { + return false; + } + + $success = true; + $exception = null; + $this->running = true; + + try { + while (!$this->queue->isEmpty()) { + [$task, $taskName, $propagateResult] = $this->queue->dequeue(); + + try { + $result = $task(); + if ($propagateResult) { + $success = $result; + } + } catch (Throwable $e) { + if ($propagateResult) { + $exception = $e; + + continue; + } + self::logError(sprintf('Unhandled %s error', $taskName), ['exception' => $e]); + } + } + } finally { + $this->running = false; + } - if (null !== $this->exporter) { - return $this->forceFlush() && $this->exporter->shutdown(); + if ($exception !== null) { + throw $exception; } - return true; + return $success; } } diff --git a/src/SDK/Trace/SpanProcessorFactory.php b/src/SDK/Trace/SpanProcessorFactory.php index 948b3de98..5709af0c7 100644 --- a/src/SDK/Trace/SpanProcessorFactory.php +++ b/src/SDK/Trace/SpanProcessorFactory.php @@ -8,6 +8,7 @@ use OpenTelemetry\SDK\Common\Environment\EnvironmentVariablesTrait; use OpenTelemetry\SDK\Common\Environment\KnownValues as Values; use OpenTelemetry\SDK\Common\Environment\Variables as Env; +use OpenTelemetry\SDK\Common\Time\ClockFactory; use OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor; use OpenTelemetry\SDK\Trace\SpanProcessor\NoopSpanProcessor; use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor; @@ -18,10 +19,21 @@ class SpanProcessorFactory public function fromEnvironment(?SpanExporterInterface $exporter = null): SpanProcessorInterface { + if ($exporter === null) { + return new NoopSpanProcessor(); + } + $name = $this->getEnumFromEnvironment(Env::OTEL_PHP_TRACES_PROCESSOR); switch ($name) { case Values::VALUE_BATCH: - return new BatchSpanProcessor($exporter); + return new BatchSpanProcessor( + $exporter, + ClockFactory::getDefault(), + $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_QUEUE_SIZE, BatchSpanProcessor::DEFAULT_MAX_QUEUE_SIZE), + $this->getIntFromEnvironment(Env::OTEL_BSP_SCHEDULE_DELAY, BatchSpanProcessor::DEFAULT_SCHEDULE_DELAY), + $this->getIntFromEnvironment(Env::OTEL_BSP_EXPORT_TIMEOUT, BatchSpanProcessor::DEFAULT_EXPORT_TIMEOUT), + $this->getIntFromEnvironment(Env::OTEL_BSP_MAX_EXPORT_BATCH_SIZE, BatchSpanProcessor::DEFAULT_MAX_EXPORT_BATCH_SIZE), + ); case Values::VALUE_SIMPLE: return new SimpleSpanProcessor($exporter); case Values::VALUE_NOOP: diff --git a/tests/Benchmark/OtlpBench.php b/tests/Benchmark/OtlpBench.php index a48d469a7..c19bdc546 100644 --- a/tests/Benchmark/OtlpBench.php +++ b/tests/Benchmark/OtlpBench.php @@ -14,6 +14,7 @@ use OpenTelemetry\SDK\Resource\ResourceInfo; use OpenTelemetry\SDK\Trace\Sampler\AlwaysOnSampler; use OpenTelemetry\SDK\Trace\SamplerInterface; +use OpenTelemetry\SDK\Trace\SpanProcessor\NoopSpanProcessor; use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor; use OpenTelemetry\SDK\Trace\TracerProvider; use Psr\Http\Client\ClientInterface; @@ -44,7 +45,7 @@ public function __construct() public function setUpNoExporter(): void { - $processor = new SimpleSpanProcessor(); + $processor = new NoopSpanProcessor(); $provider = new TracerProvider($processor, $this->sampler, $this->resource); $this->tracer = $provider->getTracer('io.opentelemetry.contrib.php'); } diff --git a/tests/Unit/SDK/Trace/SpanProcessor/BatchSpanProcessorTest.php b/tests/Unit/SDK/Trace/SpanProcessor/BatchSpanProcessorTest.php index 13544b472..10920340c 100644 --- a/tests/Unit/SDK/Trace/SpanProcessor/BatchSpanProcessorTest.php +++ b/tests/Unit/SDK/Trace/SpanProcessor/BatchSpanProcessorTest.php @@ -4,29 +4,30 @@ namespace OpenTelemetry\Tests\Unit\SDK\Trace\SpanProcessor; -use AssertWell\PHPUnitGlobalState\EnvironmentVariables; -use Exception; +use InvalidArgumentException; +use LogicException; use Mockery; use Mockery\Adapter\Phpunit\MockeryTestCase; use OpenTelemetry\API\Trace as API; use OpenTelemetry\Context\Context; use OpenTelemetry\SDK\Common\Future\CompletedFuture; +use OpenTelemetry\SDK\Common\Log\LoggerHolder; use OpenTelemetry\SDK\Common\Time\ClockFactory; use OpenTelemetry\SDK\Common\Time\ClockInterface; use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface; use OpenTelemetry\SDK\Trace\SpanDataInterface; use OpenTelemetry\SDK\Trace\SpanExporterInterface; use OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor; +use OpenTelemetry\SDK\Trace\SpanProcessorInterface; use OpenTelemetry\Tests\Unit\SDK\Util\TestClock; -use ReflectionObject; +use Psr\Log\LoggerInterface; +use Psr\Log\LogLevel; /** - * @covers OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor + * @covers \OpenTelemetry\SDK\Trace\SpanProcessor\BatchSpanProcessor */ class BatchSpanProcessorTest extends MockeryTestCase { - use EnvironmentVariables; - private TestClock $testClock; protected function setUp(): void @@ -39,18 +40,6 @@ protected function setUp(): void protected function tearDown(): void { ClockFactory::setDefault(null); - $this->restoreEnvironmentVariables(); - } - - public function test_allows_null_exporter(): void - { - $proc = new BatchSpanProcessor(null, $this->testClock); - $span = $this->createSampledSpanMock(); - $proc->onStart($span, Context::getCurrent()); - $proc->onEnd($span); - $proc->forceFlush(); - $proc->shutdown(); - $this->assertTrue(true); // phpunit requires an assertion } public function test_export_batch_size_met(): void @@ -83,27 +72,6 @@ public function test_export_batch_size_met(): void } } - public function test_export_batch_size_greater_than_queue_size_is_rejected(): void - { - $batchSize = 3; - $queueSize = 2; // queue is smaller than batch - $exportDelay = 3; - $timeout = 3000; - - $exporter = $this->createMock(SpanExporterInterface::class); - - $this->expectException(\InvalidArgumentException::class); - /** @var SpanExporterInterface $exporter */ - $processor = new BatchSpanProcessor( - $exporter, - $this->testClock, - $queueSize, - $exportDelay, - $timeout, - $batchSize - ); - } - /** * @dataProvider scheduledDelayProvider */ @@ -119,7 +87,7 @@ public function test_export_scheduled_delay(int $exportDelay, int $advanceByNano } $exporter = $this->createMock(SpanExporterInterface::class); - $exporter->expects($this->exactly($expectedFlush ? 1 : 0))->method('forceFlush'); + $exporter->expects($this->exactly($expectedFlush ? 1 : 0))->method('export'); /** @var SpanExporterInterface $exporter */ $processor = new BatchSpanProcessor( @@ -162,7 +130,6 @@ public function test_export_delay_limit_reached_partially_filled_batch(): void } $exporter = Mockery::mock(SpanExporterInterface::class); - $exporter->expects('forceFlush'); $exporter ->expects('export') ->with( @@ -328,6 +295,57 @@ function (array $spans) { $processor->forceFlush(); } + public function test_queue_size_exceeded_drops_spans(): void + { + $exporter = $this->createMock(SpanExporterInterface::class); + $processor = new BatchSpanProcessor($exporter, $this->testClock, 5, 5000, 30000, 5); + + $exporter->expects($this->exactly(2))->method('export')->willReturnCallback(function (iterable $batch) use ($processor, &$i) { + if ($i) { + $this->assertCount(3, $batch); + } else { + for ($i = 0; $i < 5; $i++) { + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + } + } + + return 0; + }); + + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + + $processor->forceFlush(); + $processor->forceFlush(); + } + + public function test_force_flush_applies_only_to_current_spans(): void + { + $exporter = $this->createMock(SpanExporterInterface::class); + $processor = new BatchSpanProcessor($exporter, $this->testClock); + + $exporter->expects($this->exactly(1))->method('export')->willReturnCallback(function (iterable $batch) use ($processor) { + $this->assertCount(1, $batch); + + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + + return 0; + }); + + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + + $processor->forceFlush(); + } + public function test_shutdown_shutdowns_exporter(): void { $exporter = $this->createMock(SpanExporterInterface::class); @@ -337,34 +355,136 @@ public function test_shutdown_shutdowns_exporter(): void $processor->shutdown(); } - public function test_create_from_environment_variables(): void + public function test_throwing_exporter_export(): void { $exporter = $this->createMock(SpanExporterInterface::class); + $exporter->method('forceFlush')->willReturn(true); + $exporter->method('export')->willThrowException(new LogicException()); - $input = [ - ['OTEL_BSP_MAX_EXPORT_BATCH_SIZE', 'maxExportBatchSize', 1], - ['OTEL_BSP_MAX_QUEUE_SIZE', 'maxQueueSize', 2], - ['OTEL_BSP_SCHEDULE_DELAY', 'scheduledDelayMillis', 3], - ['OTEL_BSP_EXPORT_TIMEOUT', 'exporterTimeoutMillis', 4], - ]; - foreach ($input as $i) { - $this->setEnvironmentVariable($i[0], $i[2]); + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('log')->with(LogLevel::ERROR); + + $processor = new BatchSpanProcessor($exporter, $this->testClock); + + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + + $previousLogger = LoggerHolder::get(); + LoggerHolder::set($logger); + + try { + $processor->forceFlush(); + } finally { + LoggerHolder::set($previousLogger); } - $processor = new BatchSpanProcessor($exporter); - $reflection = new ReflectionObject($processor); - foreach ($input as $i) { - $attr = $reflection->getProperty($i[1]); - $attr->setAccessible(true); - $this->assertEquals($i[2], $attr->getValue($processor)); + } + + public function test_throwing_exporter_flush(): void + { + $exporter = $this->createMock(SpanExporterInterface::class); + $exporter->method('forceFlush')->willThrowException(new LogicException()); + + $this->expectException(LogicException::class); + + $processor = new BatchSpanProcessor($exporter, $this->testClock); + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + + $processor->forceFlush(); + } + + public function test_throwing_exporter_flush_cannot_rethrow_in_original_caller_logs_error(): void + { + $exporter = $this->createMock(SpanExporterInterface::class); + $exporter->method('forceFlush')->willReturnCallback(function () use (&$processor) { + /** @var SpanProcessorInterface $processor */ + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + + return $processor->shutdown(); + }); + $exporter->method('shutdown')->willThrowException(new LogicException()); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('log')->with(LogLevel::ERROR); + + $processor = new BatchSpanProcessor($exporter, $this->testClock); + + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + + $previousLogger = LoggerHolder::get(); + LoggerHolder::set($logger); + + try { + $processor->forceFlush(); + } finally { + LoggerHolder::set($previousLogger); } } - public function test_create_non_numeric_environment_value_throws_exception(): void + public function test_throwing_exporter_flush_rethrows_in_original_caller(): void + { + $exporter = $this->createMock(SpanExporterInterface::class); + $exporter->method('forceFlush')->willReturnCallback(function () use (&$processor) { + /** @var SpanProcessorInterface $processor */ + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + $processor->shutdown(); + + throw new LogicException(); + }); + $exporter->expects($this->once())->method('shutdown'); + + $this->expectException(LogicException::class); + + $processor = new BatchSpanProcessor($exporter, $this->testClock); + + $span = $this->createSampledSpanMock(); + $processor->onStart($span, Context::getCurrent()); + $processor->onEnd($span); + + $processor->forceFlush(); + } + + public function test_span_processor_throws_on_invalid_max_queue_size(): void + { + $this->expectException(InvalidArgumentException::class); + $exporter = $this->createMock(SpanExporterInterface::class); + new BatchSpanProcessor($exporter, $this->testClock, -1); + } + + public function test_span_processor_throws_on_invalid_scheduled_delay(): void + { + $this->expectException(InvalidArgumentException::class); + $exporter = $this->createMock(SpanExporterInterface::class); + new BatchSpanProcessor($exporter, $this->testClock, 2048, -1); + } + + public function test_span_processor_throws_on_invalid_export_timeout(): void + { + $this->expectException(InvalidArgumentException::class); + $exporter = $this->createMock(SpanExporterInterface::class); + new BatchSpanProcessor($exporter, $this->testClock, 2048, 5000, -1); + } + + public function test_span_processor_throws_on_invalid_max_export_batch_size(): void + { + $this->expectException(InvalidArgumentException::class); + $exporter = $this->createMock(SpanExporterInterface::class); + new BatchSpanProcessor($exporter, $this->testClock, 2048, 5000, 30000, -1); + } + + public function test_span_processor_throws_on_invalid_max_export_batch_size_exceeding_max_queue_size(): void { - $this->setEnvironmentVariable('OTEL_BSP_MAX_QUEUE_SIZE', 'fruit'); + $this->expectException(InvalidArgumentException::class); $exporter = $this->createMock(SpanExporterInterface::class); - $this->expectException(Exception::class); - new BatchSpanProcessor($exporter); + new BatchSpanProcessor($exporter, $this->testClock, 2, 5000, 30000, 3); } private function createSampledSpanMock() diff --git a/tests/Unit/SDK/Trace/SpanProcessor/SimpleSpanProcessorTest.php b/tests/Unit/SDK/Trace/SpanProcessor/SimpleSpanProcessorTest.php index 518c8bdcf..1bfe4a6f3 100644 --- a/tests/Unit/SDK/Trace/SpanProcessor/SimpleSpanProcessorTest.php +++ b/tests/Unit/SDK/Trace/SpanProcessor/SimpleSpanProcessorTest.php @@ -4,6 +4,7 @@ namespace OpenTelemetry\Tests\Unit\SDK\Trace\SpanProcessor; +use LogicException; use Mockery; use Mockery\Adapter\Phpunit\MockeryTestCase; use Mockery\MockInterface; @@ -11,11 +12,14 @@ use OpenTelemetry\API\Trace\SpanContextInterface; use OpenTelemetry\Context\Context; use OpenTelemetry\SDK\Common\Future\CompletedFuture; +use OpenTelemetry\SDK\Common\Log\LoggerHolder; use OpenTelemetry\SDK\Trace\ReadableSpanInterface; use OpenTelemetry\SDK\Trace\ReadWriteSpanInterface; use OpenTelemetry\SDK\Trace\SpanExporterInterface; use OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor; use OpenTelemetry\Tests\Unit\SDK\Util\SpanData; +use Psr\Log\LoggerInterface; +use Psr\Log\LogLevel; /** * @covers \OpenTelemetry\SDK\Trace\SpanProcessor\SimpleSpanProcessor @@ -59,6 +63,14 @@ public function test_on_start(): void $this->spanExporter->shouldNotReceive('export'); } + public function test_on_end_after_shutdown(): void + { + $this->spanExporter->shouldReceive('shutdown'); + $this->spanExporter->shouldNotReceive('export'); + $this->simpleSpanProcessor->shutdown(); + $this->simpleSpanProcessor->onEnd($this->readableSpan); + } + public function test_on_end_sampled_span(): void { $spanData = new SpanData(); @@ -76,24 +88,85 @@ public function test_on_end_non_sampled_span(): void $this->simpleSpanProcessor->onEnd($this->readableSpan); } + public function test_does_not_trigger_concurrent_export(): void + { + $spanData = new SpanData(); + $count = 3; + $this->readableSpan->expects('getContext')->times($count)->andReturn($this->sampledSpanContext); + $this->readableSpan->expects('toSpanData')->times($count)->andReturn($spanData); + + $this->spanExporter->expects('export')->times($count)->andReturnUsing(function () use (&$running, &$count) { + $this->assertNotTrue($running); + $running = true; + if (--$count) { + $this->simpleSpanProcessor->onEnd($this->readableSpan); + } + $running = false; + + return 0; + }); + + $this->simpleSpanProcessor->onEnd($this->readableSpan); + } + // TODO: Add test to ensure exporter is retried on failure. public function test_force_flush(): void { + $this->spanExporter->expects('forceFlush')->andReturn(true); $this->assertTrue($this->simpleSpanProcessor->forceFlush()); } + public function test_force_flush_after_shutdown(): void + { + $this->spanExporter->expects('shutdown')->andReturn(true); + $this->spanExporter->shouldNotReceive('forceFlush'); + $this->simpleSpanProcessor->shutdown(); + $this->simpleSpanProcessor->forceFlush(); + } + public function test_shutdown(): void { $this->spanExporter->expects('shutdown')->andReturnTrue(); $this->assertTrue($this->simpleSpanProcessor->shutdown()); - $this->assertTrue($this->simpleSpanProcessor->shutdown()); + $this->assertFalse($this->simpleSpanProcessor->shutdown()); + } + + public function test_throwing_exporter_export(): void + { + $exporter = $this->createMock(SpanExporterInterface::class); + $exporter->method('forceFlush')->willReturn(true); + $exporter->method('export')->willThrowException(new LogicException()); + + $logger = $this->createMock(LoggerInterface::class); + $logger->expects($this->once())->method('log')->with(LogLevel::ERROR); + + $processor = new SimpleSpanProcessor($exporter); + + $this->readableSpan->expects('getContext')->andReturn($this->sampledSpanContext); + $this->readableSpan->expects('toSpanData')->andReturn(new SpanData()); + + $previousLogger = LoggerHolder::get(); + LoggerHolder::set($logger); + + try { + $processor->onStart($this->readWriteSpan, Context::getCurrent()); + $processor->onEnd($this->readableSpan); + } finally { + LoggerHolder::set($previousLogger); + } } - public function test_shutdown_with_no_exporter(): void + public function test_throwing_exporter_flush(): void { - $processor = new SimpleSpanProcessor(null); - $this->assertTrue($processor->shutdown()); + $exporter = $this->createMock(SpanExporterInterface::class); + $exporter->method('forceFlush')->willThrowException(new LogicException()); + + $this->expectException(LogicException::class); + + $processor = new SimpleSpanProcessor($exporter); + + $processor->forceFlush(); } } diff --git a/tests/Unit/SDK/Trace/SpanProcessorFactoryTest.php b/tests/Unit/SDK/Trace/SpanProcessorFactoryTest.php index c9c227435..8f4594872 100644 --- a/tests/Unit/SDK/Trace/SpanProcessorFactoryTest.php +++ b/tests/Unit/SDK/Trace/SpanProcessorFactoryTest.php @@ -33,7 +33,7 @@ public function test_span_processor_factory_create_span_processor_from_environme { $this->setEnvironmentVariable('OTEL_PHP_TRACES_PROCESSOR', $processorName); $factory = new SpanProcessorFactory(); - $this->assertInstanceOf($expected, $factory->fromEnvironment()); + $this->assertInstanceOf($expected, $factory->fromEnvironment($this->createMock(SpanExporterInterface::class))); } public function processorProvider()