diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3b1fc83d..a5499d51 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ name: "CI" on: [push, pull_request] env: - FLINK_TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-2.0-SNAPSHOT-bin-scala_2.12.tgz" + FLINK_TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-2.1-SNAPSHOT-bin-scala_2.12.tgz" jobs: ci: @@ -27,7 +27,7 @@ jobs: strategy: fail-fast: false matrix: - java_version: [ 8, 11, 17, 21 ] + java_version: [ 17, 21 ] steps: - uses: actions/checkout@v4 - name: "Build images" diff --git a/testing/docker-test-job/pom.xml b/testing/docker-test-job/pom.xml index 6c6185f1..c52eb94b 100644 --- a/testing/docker-test-job/pom.xml +++ b/testing/docker-test-job/pom.xml @@ -27,7 +27,7 @@ under the License. UTF-8 - 1.9.0 + 1.20.0 1.8 2.11 ${java.version} @@ -37,7 +37,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided diff --git a/testing/docker-test-job/src/main/java/org/apache/flink/StreamingJob.java b/testing/docker-test-job/src/main/java/org/apache/flink/StreamingJob.java index 4806a438..ee67a652 100644 --- a/testing/docker-test-job/src/main/java/org/apache/flink/StreamingJob.java +++ b/testing/docker-test-job/src/main/java/org/apache/flink/StreamingJob.java @@ -18,31 +18,184 @@ package org.apache.flink; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.core.io.InputStatus; +import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; public class StreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.addSource(new InfiniteSource()).map(x -> x); + env.fromSource(new InfiniteSource(), WatermarkStrategy.noWatermarks(), "source") + .map(x -> x); env.execute(); } - private static final class InfiniteSource implements SourceFunction { + /** Infinite source for testing. */ + private static final class InfiniteSource + implements Source { + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public SplitEnumerator createEnumerator( + SplitEnumeratorContext splitEnumeratorContext) throws Exception { + return new NoOpEnumerator(); + } + + @Override + public SplitEnumerator restoreEnumerator( + SplitEnumeratorContext splitEnumeratorContext, + NoOpEnumState noOpEnumState) + throws Exception { + return new NoOpEnumerator(); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new DummySplitSerializer(); + } + + @Override + public SimpleVersionedSerializer getEnumeratorCheckpointSerializer() { + return new NoOpEnumStateSerializer(); + } + + @Override + public SourceReader createReader( + SourceReaderContext sourceReaderContext) throws Exception { + return new InfiniteSourceReader(); + } + } + + /** Reader for {@link InfiniteSource}. */ + private static class InfiniteSourceReader implements SourceReader { + + @Override + public InputStatus pollNext(ReaderOutput readerOutput) throws Exception { + Thread.sleep(20); + return InputStatus.MORE_AVAILABLE; + } + + @Override + public List snapshotState(long l) { + return Collections.singletonList(new DummySplit()); + } + + @Override + public CompletableFuture isAvailable() { + return CompletableFuture.completedFuture(null); + } + + @Override + public void start() { + // no op + } + + @Override + public void addSplits(List list) { + // no op + } + + @Override + public void notifyNoMoreSplits() { + // no op + } + + @Override + public void close() throws Exception { + // no op + } + } + + /** Mock enumerator. */ + private static class NoOpEnumerator implements SplitEnumerator { + @Override + public void start() {} + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} + + @Override + public void addSplitsBack(List splits, int subtaskId) {} + + @Override + public void addReader(int subtaskId) {} + + @Override + public NoOpEnumState snapshotState(long checkpointId) throws Exception { + return new NoOpEnumState(); + } + + @Override + public void close() throws IOException {} + } + + /** The split of the {@link InfiniteSource}. */ + private static class DummySplit implements SourceSplit { + public static final String SPLIT_ID = "DummySplitId"; + + @Override + public String splitId() { + return SPLIT_ID; + } + } + + /** Dummy enum state. */ + private static class NoOpEnumState {} - private volatile boolean running = true; + /** Mock enumerator state serializer. */ + private static class NoOpEnumStateSerializer + implements SimpleVersionedSerializer { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(NoOpEnumState obj) throws IOException { + return new byte[0]; + } + + @Override + public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException { + return new NoOpEnumState(); + } + } + + private static class DummySplitSerializer implements SimpleVersionedSerializer { + + @Override + public int getVersion() { + return 0; + } @Override - public void run(SourceContext ctx) throws Exception { - while (running) { - Thread.sleep(20); - } + public byte[] serialize(DummySplit obj) throws IOException { + return new byte[0]; } @Override - public void cancel() { - running = false; + public DummySplit deserialize(int version, byte[] serialized) throws IOException { + return new DummySplit(); } } -} +} \ No newline at end of file