Skip to content

Commit

Permalink
append
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Feb 7, 2025
1 parent 025863b commit 9d5fbd0
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 14 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java_version: [ 11, 17, 21 ]
java_version: [ 17, 21 ]
steps:
- uses: actions/checkout@v4
- name: "Build images"
Expand Down
4 changes: 2 additions & 2 deletions testing/docker-test-job/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ under the License.

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<flink.version>1.20.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand All @@ -37,7 +37,7 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
175 changes: 164 additions & 11 deletions testing/docker-test-job/src/main/java/org/apache/flink/StreamingJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,184 @@

package org.apache.flink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class StreamingJob {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new InfiniteSource()).map(x -> x);
env.fromSource(new InfiniteSource(), WatermarkStrategy.noWatermarks(), "source")
.map(x -> x);
env.execute();
}

private static final class InfiniteSource implements SourceFunction<Integer> {
/** Infinite source for testing. */
private static final class InfiniteSource
implements Source<Integer, DummySplit, NoOpEnumState> {
@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public SplitEnumerator<DummySplit, NoOpEnumState> createEnumerator(
SplitEnumeratorContext<DummySplit> splitEnumeratorContext) throws Exception {
return new NoOpEnumerator();
}

@Override
public SplitEnumerator<DummySplit, NoOpEnumState> restoreEnumerator(
SplitEnumeratorContext<DummySplit> splitEnumeratorContext,
NoOpEnumState noOpEnumState)
throws Exception {
return new NoOpEnumerator();
}

@Override
public SimpleVersionedSerializer<DummySplit> getSplitSerializer() {
return new DummySplitSerializer();
}

@Override
public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
return new NoOpEnumStateSerializer();
}

@Override
public SourceReader<Integer, DummySplit> createReader(
SourceReaderContext sourceReaderContext) throws Exception {
return new InfiniteSourceReader();
}
}

/** Reader for {@link InfiniteSource}. */
private static class InfiniteSourceReader implements SourceReader<Integer, DummySplit> {

@Override
public InputStatus pollNext(ReaderOutput<Integer> readerOutput) throws Exception {
Thread.sleep(20);
return InputStatus.MORE_AVAILABLE;
}

@Override
public List<DummySplit> snapshotState(long l) {
return Collections.singletonList(new DummySplit());
}

@Override
public CompletableFuture<Void> isAvailable() {
return CompletableFuture.completedFuture(null);
}

@Override
public void start() {
// no op
}

@Override
public void addSplits(List<DummySplit> list) {
// no op
}

@Override
public void notifyNoMoreSplits() {
// no op
}

@Override
public void close() throws Exception {
// no op
}
}

/** Mock enumerator. */
private static class NoOpEnumerator implements SplitEnumerator<DummySplit, NoOpEnumState> {
@Override
public void start() {}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {}

@Override
public void addSplitsBack(List<DummySplit> splits, int subtaskId) {}

@Override
public void addReader(int subtaskId) {}

@Override
public NoOpEnumState snapshotState(long checkpointId) throws Exception {
return new NoOpEnumState();
}

@Override
public void close() throws IOException {}
}

/** The split of the {@link InfiniteSource}. */
private static class DummySplit implements SourceSplit {
public static final String SPLIT_ID = "DummySplitId";

@Override
public String splitId() {
return SPLIT_ID;
}
}

/** Dummy enum state. */
private static class NoOpEnumState {}

private volatile boolean running = true;
/** Mock enumerator state serializer. */
private static class NoOpEnumStateSerializer
implements SimpleVersionedSerializer<NoOpEnumState> {
@Override
public int getVersion() {
return 0;
}

@Override
public byte[] serialize(NoOpEnumState obj) throws IOException {
return new byte[0];
}

@Override
public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException {
return new NoOpEnumState();
}
}

private static class DummySplitSerializer implements SimpleVersionedSerializer<DummySplit> {

@Override
public int getVersion() {
return 0;
}

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
Thread.sleep(20);
}
public byte[] serialize(DummySplit obj) throws IOException {
return new byte[0];
}

@Override
public void cancel() {
running = false;
public DummySplit deserialize(int version, byte[] serialized) throws IOException {
return new DummySplit();
}
}
}
}

0 comments on commit 9d5fbd0

Please sign in to comment.