Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37035][release] Update dev-master point to 2.1 #209

Merged
merged 2 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ name: "CI"
on: [push, pull_request]

env:
FLINK_TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-2.0-SNAPSHOT-bin-scala_2.12.tgz"
FLINK_TAR_URL: "https://s3.amazonaws.com/flink-nightly/flink-2.1-SNAPSHOT-bin-scala_2.12.tgz"

jobs:
ci:
Expand All @@ -27,7 +27,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java_version: [ 8, 11, 17, 21 ]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The the uploaded binary release for flink 2.1 was compiled by JDK17. It can't be worked in jdk8/11 env.

java_version: [ 17, 21 ]
steps:
- uses: actions/checkout@v4
- name: "Build images"
Expand Down
4 changes: 2 additions & 2 deletions testing/docker-test-job/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ under the License.

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<flink.version>1.20.0</flink.version>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to 1.20 as 2.0 is not released atm.

<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand All @@ -37,7 +37,7 @@ under the License.
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
175 changes: 164 additions & 11 deletions testing/docker-test-job/src/main/java/org/apache/flink/StreamingJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,184 @@

package org.apache.flink;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class StreamingJob {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new InfiniteSource()).map(x -> x);
env.fromSource(new InfiniteSource(), WatermarkStrategy.noWatermarks(), "source")
.map(x -> x);
env.execute();
}

private static final class InfiniteSource implements SourceFunction<Integer> {
/** Infinite source for testing. */
private static final class InfiniteSource
implements Source<Integer, DummySplit, NoOpEnumState> {
@Override
public Boundedness getBoundedness() {
return Boundedness.CONTINUOUS_UNBOUNDED;
}

@Override
public SplitEnumerator<DummySplit, NoOpEnumState> createEnumerator(
SplitEnumeratorContext<DummySplit> splitEnumeratorContext) throws Exception {
return new NoOpEnumerator();
}

@Override
public SplitEnumerator<DummySplit, NoOpEnumState> restoreEnumerator(
SplitEnumeratorContext<DummySplit> splitEnumeratorContext,
NoOpEnumState noOpEnumState)
throws Exception {
return new NoOpEnumerator();
}

@Override
public SimpleVersionedSerializer<DummySplit> getSplitSerializer() {
return new DummySplitSerializer();
}

@Override
public SimpleVersionedSerializer<NoOpEnumState> getEnumeratorCheckpointSerializer() {
return new NoOpEnumStateSerializer();
}

@Override
public SourceReader<Integer, DummySplit> createReader(
SourceReaderContext sourceReaderContext) throws Exception {
return new InfiniteSourceReader();
}
}

/** Reader for {@link InfiniteSource}. */
private static class InfiniteSourceReader implements SourceReader<Integer, DummySplit> {

@Override
public InputStatus pollNext(ReaderOutput<Integer> readerOutput) throws Exception {
Thread.sleep(20);
return InputStatus.MORE_AVAILABLE;
}

@Override
public List<DummySplit> snapshotState(long l) {
return Collections.singletonList(new DummySplit());
}

@Override
public CompletableFuture<Void> isAvailable() {
return CompletableFuture.completedFuture(null);
}

@Override
public void start() {
// no op
}

@Override
public void addSplits(List<DummySplit> list) {
// no op
}

@Override
public void notifyNoMoreSplits() {
// no op
}

@Override
public void close() throws Exception {
// no op
}
}

/** Mock enumerator. */
private static class NoOpEnumerator implements SplitEnumerator<DummySplit, NoOpEnumState> {
@Override
public void start() {}

@Override
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {}

@Override
public void addSplitsBack(List<DummySplit> splits, int subtaskId) {}

@Override
public void addReader(int subtaskId) {}

@Override
public NoOpEnumState snapshotState(long checkpointId) throws Exception {
return new NoOpEnumState();
}

@Override
public void close() throws IOException {}
}

/** The split of the {@link InfiniteSource}. */
private static class DummySplit implements SourceSplit {
public static final String SPLIT_ID = "DummySplitId";

@Override
public String splitId() {
return SPLIT_ID;
}
}

/** Dummy enum state. */
private static class NoOpEnumState {}

private volatile boolean running = true;
/** Mock enumerator state serializer. */
private static class NoOpEnumStateSerializer
implements SimpleVersionedSerializer<NoOpEnumState> {
@Override
public int getVersion() {
return 0;
}

@Override
public byte[] serialize(NoOpEnumState obj) throws IOException {
return new byte[0];
}

@Override
public NoOpEnumState deserialize(int version, byte[] serialized) throws IOException {
return new NoOpEnumState();
}
}

private static class DummySplitSerializer implements SimpleVersionedSerializer<DummySplit> {

@Override
public int getVersion() {
return 0;
}

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
Thread.sleep(20);
}
public byte[] serialize(DummySplit obj) throws IOException {
return new byte[0];
}

@Override
public void cancel() {
running = false;
public DummySplit deserialize(int version, byte[] serialized) throws IOException {
return new DummySplit();
}
}
}
}