-
Notifications
You must be signed in to change notification settings - Fork 154
151 lines (142 loc) · 5.22 KB
/
docker.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
name: Docker example
on:
workflow_call:
inputs:
java-version:
default: '8'
required: false
type: string
jdk-distro:
default: 'temurin'
required: false
type: string
jobs:
docker:
runs-on: ubuntu-20.04
name: speculate fail tasks
steps:
- name: Checkout project
uses: actions/checkout@v3
- name: Set up JDK ${{ inputs.java-version }}
uses: actions/setup-java@v3
with:
java-version: ${{ inputs.java-version }}
distribution: ${{ inputs.jdk-distro }}
- name: Cache local Maven repository
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: mvn-${{ inputs.java-version }}-docker-${{ hashFiles('**/pom.xml') }}
restore-keys: |
mvn-${{ inputs.java-version }}-docker-
- name: Build the docker images
run: ./deploy/docker/build.sh
shell: bash
- name: Start the docker cluster
id: up
run: |
docker compose -f deploy/docker/docker-compose.yml up --wait --wait-timeout 60 \
--scale coordinator=2 --scale shuffle-server=4 --scale spark-worker=5
# check all rss containers are up
healthy="$(docker container ls | grep rss-server-example | grep "(healthy)" | wc -l)"
if [ "$healthy" == "6" ]
then
echo "All RSS containers up"
else
echo "::error::Could not bring up Docker cluster"
exit 1
fi
shell: bash
- name: Prepare example Spark app
run: |
cat << EOL > example.scala
import org.apache.spark.TaskContext
import org.apache.spark.sql.SaveMode
// fails iteration (at the end) or delays iteration (each element)
// failing tasks negates iterator values, shuffle data of failing task must not leak into next stage
case class FaultyIterator(it: Iterator[java.lang.Long], fail: Boolean, sleep: Option[Int]) extends Iterator[java.lang.Long] {
override def hasNext: Boolean = it.hasNext || fail
override def next(): java.lang.Long = {
// delay iteration if requested
if (sleep.isDefined) {
val start = System.nanoTime()
while (start + sleep.get >= System.nanoTime()) { }
}
// fail at the end if requested
if (fail && !it.hasNext) throw new RuntimeException()
// just iterate
if (fail) {
-it.next()
} else {
it.next()
}
}
}
spark.range(0, 10000000, 1, 100) \
.mapPartitions { it =>
val ctx = TaskContext.get();
FaultyIterator(
it,
// we fail task two 3 times
(ctx.partitionId == 2 && ctx.attemptNumber < 3),
// and delay attempt 4 so we see a speculative execution
Some(ctx.partitionId == 2 && ctx.attemptNumber >= 3).filter(v => v).map(_ => 250000)
)
} \
.groupBy(($"value" / 1000000).cast("int")) \
.as[Long, Long] \
.mapGroups{(id, it) => (id, it.length)} \
.sort("_1") \
.write \
.mode(SaveMode.Overwrite) \
.csv("/shared/result.csv")
EOL
docker cp example.scala rss-spark-master-1:/
shell: bash
- name: Run example Spark app
run: |
docker exec rss-spark-master-1 /bin/bash -c "cat /example.scala | /opt/spark/bin/spark-shell \
--master spark://rss-spark-master-1:7077 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager \
--conf spark.rss.coordinator.quorum=rss-coordinator-1:19999,rss-coordinator-2:19999 \
--conf spark.rss.storage.type=MEMORY_LOCALFILE \
--conf spark.task.maxFailures=4 \
--conf spark.speculation=true"
shell: bash
- name: Assert result
run: |
docker exec rss-spark-master-1 bash -c "cat /shared/result.csv/*.csv" > ./result.csv
cat << EOL | diff -y - result.csv
0,1000000
1,1000000
2,1000000
3,1000000
4,1000000
5,1000000
6,1000000
7,1000000
8,1000000
9,1000000
EOL
shell: bash
- name: Stop the docker cluster
if: always() && steps.up.outcome == 'success'
run: docker compose -f deploy/docker/docker-compose.yml down
shell: bash