forked from timothy-barry/simulatr-pipeline
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain_dsl1.nf
92 lines (69 loc) · 2.59 KB
/
main_dsl1.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
nextflow.enable.dsl=1
// The optional parameters
params.result_dir = "."
params.result_file_name = "simulatr_result.rds"
params.B = 0
params.B_check = 5
params.max_gb = 8
params.max_hours = 4
// First, obtain basic info, including method names and grid IDs
process obtain_basic_info {
memory '2GB'
time '15m'
output:
path "method_names.txt" into method_names_raw_ch
path "grid_rows.txt" into grid_rows_raw_ch
"""
get_info_for_nextflow.R $params.simulatr_specifier_fp
"""
}
method_names_ch = method_names_raw_ch.splitText().map{it.trim()}
grid_rows_ch = grid_rows_raw_ch.splitText().map{it.trim()}
method_cross_grid_row_ch = method_names_ch.combine(grid_rows_ch)
// Second, benchmark time and memory for each method on each grid row
process run_benchmark {
memory '4GB'
time '2h'
tag "method: $method; grid row: $grid_row"
input:
tuple val(method), val(grid_row) from method_cross_grid_row_ch
output:
path "proc_id_info_${method}_${grid_row}.csv" into proc_id_info_ch
path "benchmarking_info_${method}_${grid_row}.rds" into benchmarking_info_ch
"""
run_benchmark.R $params.simulatr_specifier_fp $method $grid_row $params.B_check $params.B $params.max_gb $params.max_hours
"""
}
// Third, run each chunk of the simulation (apply a method to some number of realizations from a grid row)
process run_simulation_chunk {
memory "$params.max_gb GB"
time "$params.max_hours h"
tag "method: $method; grid row: $grid_row; processor: $proc_id"
input:
tuple val(method), val(grid_row), val(proc_id), val(n_processors) from proc_id_info_ch.splitCsv()
output:
path "chunk_result_${method}_${grid_row}_${proc_id}.rds" into results_ch
"""
run_simulation_chunk.R $params.simulatr_specifier_fp $method $grid_row $proc_id $n_processors $params.B
"""
}
// Fourth, collect the results and evaluate metrics
process evaluate_methods {
// Retry the task up to 6 times if it fails
maxRetries 6
// Define the error strategy
errorStrategy { task.exitStatus == 137 ? 'retry' : 'terminate' }
// Double the memory each time the task is retried
memory { (Math.pow(2, task.attempt - 1) * 6).toInteger() + 'GB' }
// Double the time each time the task is retried
time { (Math.pow(2, task.attempt - 1) * 15).toInteger() + 'm' }
publishDir params.result_dir, mode: "copy"
input:
path 'chunk_result' from results_ch.collect()
path 'benchmarking_info' from benchmarking_info_ch.collect()
output:
path "$params.result_file_name" into final_results_ch
"""
run_evaluation.R $params.simulatr_specifier_fp $params.result_file_name chunk_result* benchmarking_info*
"""
}