Skip to content

Commit

Permalink
Replace reliance on sleep with awaitility
Browse files Browse the repository at this point in the history
mrotteveel committed May 17, 2024
1 parent 9d7bb0f commit 79b7183
Showing 1 changed file with 61 additions and 55 deletions.
116 changes: 61 additions & 55 deletions src/test/org/firebirdsql/event/FBEventManagerTest.java
Original file line number Diff line number Diff line change
@@ -32,18 +32,23 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.File;
import java.lang.reflect.Field;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.with;
import static org.firebirdsql.common.FBTestProperties.*;
import static org.firebirdsql.common.matchers.SQLExceptionMatchers.errorCodeEquals;
@@ -129,36 +134,43 @@ void testWaitForEventNoEvent() throws Exception {
@Test
void testWaitForEventIndefinitely() throws Exception {
setupDefaultEventManager();
var eventWait = new EventWait("TEST_EVENT_B", 0);
var waitThread = new Thread(eventWait);
waitThread.start();
waitThread.join(1000);
if (waitThread.isAlive()) waitThread.interrupt();
assertEquals(0, eventWait.getEventCount());
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
Future<Integer> eventResult = executorService.submit(new EventWait("TEST_EVENT_B", 0));
await().during(1, TimeUnit.SECONDS).until(() -> !eventResult.isDone());
eventResult.cancel(true);
assertThrows(CancellationException.class, eventResult::get);
} finally {
executorService.shutdownNow();
}
}

@Test
void testWaitForEventWithOccurrence() throws Exception {
setupDefaultEventManager();
var eventWait = new EventWait("TEST_EVENT_B", 10000);
var waitThread = new Thread(eventWait);
waitThread.start();
Thread.sleep(SHORT_DELAY);
executeSql("INSERT INTO TEST VALUES (1)");
waitThread.join();
assertEquals(1, eventWait.getEventCount());
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
Future<Integer> eventResult = executorService.submit(new EventWait("TEST_EVENT_B", 10000));
executeSql("INSERT INTO TEST VALUES (1)");
await().atMost(1, TimeUnit.SECONDS).until(eventResult::isDone);
assertEquals(1, eventResult.get());
} finally {
executorService.shutdownNow();
}
}

@Test
void testWaitForEventWithOccurrenceNoTimeout() throws Exception {
setupDefaultEventManager();
var eventWait = new EventWait("TEST_EVENT_A", 0);
var waitThread = new Thread(eventWait);
waitThread.start();
Thread.sleep(SHORT_DELAY);
executeSql("INSERT INTO TEST VALUES (2)");
waitThread.join();
assertEquals(2, eventWait.getEventCount());
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
Future<Integer> eventResult = executorService.submit(new EventWait("TEST_EVENT_A", 0));
executeSql("INSERT INTO TEST VALUES (2)");
await().atMost(1, TimeUnit.SECONDS).until(eventResult::isDone);
assertEquals(2, eventResult.get());
} finally {
executorService.shutdownNow();
}
}

@Test
@@ -219,9 +231,10 @@ void testLargeMultiLoad() throws Exception {
final int threadCount = 5;
final int repetitionCount = 100;
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
var eventProducer = new EventProducer(repetitionCount);
try {
for (int i = 0; i < threadCount; i++) {
executorService.submit(new EventProducer(repetitionCount));
executorService.submit(eventProducer);
}
} finally {
executorService.shutdown();
@@ -240,9 +253,10 @@ void testLargeMultiLoad() throws Exception {
void testSlowCallback() throws Exception {
setupDefaultEventManager();
var ael = new AccumulatingEventListener() {
@SuppressWarnings("java:S2925")
public void eventOccurred(DatabaseEvent e) {
try {
Thread.sleep(300);
Thread.sleep(250);
} catch (InterruptedException ie) {
// ignore
}
@@ -255,16 +269,15 @@ public void eventOccurred(DatabaseEvent e) {
for (int i = 0; i < repetitionCount; i++) {
executeSql("INSERT INTO TEST VALUES (5)");
}
with().pollDelay(1500, TimeUnit.MILLISECONDS)
with().pollDelay(1200, TimeUnit.MILLISECONDS)
.await().until(ael::getTotalEvents, equalTo(repetitionCount));
}

@Test
void testMultipleManagersOnExistingConnectionOnOneEvent() throws Exception {
try (Connection connection = getConnectionViaDriverManager()) {
var eventManager1 = FBEventManager.createFor(connection);
var eventManager2 = FBEventManager.createFor(connection);

try (Connection connection = getConnectionViaDriverManager();
var eventManager1 = FBEventManager.createFor(connection);
var eventManager2 = FBEventManager.createFor(connection)) {
eventManager1.connect();
eventManager2.connect();

@@ -303,9 +316,6 @@ void testMultipleManagersOnExistingConnectionOnOneEvent() throws Exception {
assertEquals(4, ael5.getTotalEvents(), "ael5 totalEvents");
assertEquals(0, ael6.getTotalEvents(), "ael6 totalEvents");
});

eventManager1.disconnect();
eventManager2.disconnect();
}
}

@@ -351,29 +361,29 @@ void testEventManagerOnExistingException_connectThrowsException_afterConnectionC
}

@ParameterizedTest
@ValueSource(strings = { "BrokenSQLException", "FatalSQLException", "SocketException", "SocketTimeoutException" })
void testDefaultEventManagerDisconnectOnFatalException(String exceptionType) throws Exception {
@MethodSource("fatalExceptions")
void testDefaultEventManagerDisconnectOnFatalException(SQLException exception) throws Exception {
setupDefaultEventManager();
AbstractFbDatabase<?> db = (AbstractFbDatabase<?>) ((FBEventManager) eventManager).getFbDatabase();
Field eldField = AbstractFbAttachment.class.getDeclaredField("exceptionListenerDispatcher");
eldField.setAccessible(true);
ExceptionListenerDispatcher eld = (ExceptionListenerDispatcher) eldField.get(db);

SQLException exception = switch (exceptionType) {
case "BrokenSQLException" -> new SQLException("broken", "00000", ISCConstants.isc_net_write_err);
case "FatalSQLException" -> new SQLException("fatal not broken", "00000", ISCConstants.isc_req_sync);
case "SocketException" -> new SQLException(new SocketException());
case "SocketTimeoutException" -> new SQLException(new SocketTimeoutException());
default -> throw new IllegalArgumentException("Unexpected exceptionType: " + exceptionType);
};

assertTrue(eventManager.isConnected(), "expected connected event manager");

eld.errorOccurred(exception);

assertFalse(eventManager.isConnected(), "expected disconnected event manager");
}

static Stream<SQLException> fatalExceptions() {
return Stream.of(
new SQLException("broken", "00000", ISCConstants.isc_net_write_err),
new SQLException("fatal not broken", "00000", ISCConstants.isc_req_sync),
new SQLException(new SocketException()),
new SQLException(new SocketTimeoutException()));
}

/**
* Tests if a default event manager is reported as closed when the underlying {@link FbDatabase} detaches.
* <p>
@@ -392,30 +402,25 @@ void testDefaultEventManagerDisconnectionOnDbClose() throws Exception {
assertFalse(eventManager.isConnected(), "expected disconnected event manager");
}

private class EventWait implements Runnable {
private class EventWait implements Callable<Integer> {

private final String eventName;
private int eventCount;
private final int timeout;

EventWait(String eventName, int timeout) {
this.eventName = eventName;
this.timeout = timeout;
}

public void run() {
public Integer call() {
try {
eventCount = eventManager.waitForEvent(eventName, timeout);
} catch (InterruptedException ie) {
eventCount = 0;
return eventManager.waitForEvent(eventName, timeout);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
throw new RuntimeException(e);
}
}

int getEventCount() {
return this.eventCount;
}
}

private static class AccumulatingEventListener implements EventListener {
@@ -435,15 +440,16 @@ public synchronized void eventOccurred(DatabaseEvent event) {
private record EventProducer(int count) implements Runnable {
@Override
public void run() {
try (Connection conn = getConnectionViaDriverManager();
try (var conn = getConnectionViaDriverManager();
var stmt = conn.prepareStatement("INSERT INTO TEST VALUES (?)")) {
for (int i = 0; i < count; i++) {
stmt.setInt(1, i);
stmt.execute();
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
throw new RuntimeException(e);
}
}
}

0 comments on commit 79b7183

Please sign in to comment.