diff --git a/.JuliaFormatter.toml b/.JuliaFormatter.toml new file mode 100644 index 0000000..fb83024 --- /dev/null +++ b/.JuliaFormatter.toml @@ -0,0 +1,8 @@ +style = "yas" +align_assignment = true +whitespace_in_kwargs = true +whitespace_ops_in_indices = false +pipe_to_function_call = false +align_struct_field = true +align_pair_arrow = true +align_conditional = true diff --git a/.github/workflows/TagBot.yml b/.github/workflows/TagBot.yml new file mode 100644 index 0000000..f389611 --- /dev/null +++ b/.github/workflows/TagBot.yml @@ -0,0 +1,20 @@ +name: TagBot +on: + issue_comment: + types: + - created + workflow_dispatch: + inputs: + lookback: + default: 3 +permissions: + contents: write +jobs: + TagBot: + if: github.event_name == 'workflow_dispatch' || github.actor == 'JuliaTagBot' + runs-on: ubuntu-latest + steps: + - uses: JuliaRegistries/TagBot@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + ssh: ${{ secrets.DOCUMENTER_KEY }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1d1d02a --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ + +Manifest.toml diff --git a/deps/build.jl b/deps/build.jl index 1803465..931c149 100644 --- a/deps/build.jl +++ b/deps/build.jl @@ -10,7 +10,7 @@ paths = ["jobs", "storage/servers", "storage/execs", "storage/environments"] -for p in paths +for p in paths mkpath(config_path(p)) end @@ -27,26 +27,31 @@ function configure_local() if !ispath(spath) scheduler = nothing if haskey(ENV, "DFC_SCHEDULER") - sched = ENV["DFC_SCHEDULER"] if occursin("hq", lowercase(sched)) cmd = get(ENV, "DFC_SCHEDULER_CMD", "hq") - scheduler = (type="hq", server_command=cmd, allocs=String[]) + scheduler = (type = "hq", server_command = cmd, allocs = String[]) elseif lowercase(sched) == "slurm" - scheduler = (type="slurm",) + scheduler = (type = "slurm",) else error("Scheduler $sched not recognized please set a different DFC_SCHEDULER environment var.") end end - + for t in ("hq", "sbatch") if Sys.which(t) !== nothing - scheduler = t == "hq" ? (type="hq", server_command="hq", allocs=String[]) : (type="slurm",) + scheduler = t == "hq" ? + (type = "hq", server_command = "hq", allocs = String[]) : + (type = "slurm",) end end - scheduler = scheduler === nothing ? (type="bash", ) : scheduler + scheduler = scheduler === nothing ? (type = "bash",) : scheduler user = get(ENV, "USER", "noname") - JSON3.write(spath, (name=host, username=user, domain="localhost", julia_exec=joinpath(Sys.BINDIR, "julia"), scheduler=scheduler, port=8080, local_port=0, root_jobdir=homedir(), max_concurrent_jobs=100, uuid=string(uuid4()))) + JSON3.write(spath, + (name = host, username = user, domain = "localhost", + julia_exec = joinpath(Sys.BINDIR, "julia"), scheduler = scheduler, + port = 8080, local_port = 0, root_jobdir = homedir(), + max_concurrent_jobs = 100, uuid = string(uuid4()))) end end configure_local() diff --git a/docs/make.jl b/docs/make.jl index 0ca1a1c..ca19f87 100644 --- a/docs/make.jl +++ b/docs/make.jl @@ -1,11 +1,9 @@ using Documenter using RemoteHPC -makedocs( - sitename = "RemoteHPC", - format = Documenter.HTML(), - modules = [RemoteHPC] -) +makedocs(; sitename = "RemoteHPC", + format = Documenter.HTML(), + modules = [RemoteHPC]) # Documenter can also automatically deploy documentation to gh-pages. # See "Hosting Documentation" and deploydocs() in the Documenter manual diff --git a/src/RemoteHPC.jl b/src/RemoteHPC.jl index 713f7e1..145909a 100644 --- a/src/RemoteHPC.jl +++ b/src/RemoteHPC.jl @@ -30,15 +30,16 @@ include("schedulers.jl") include("servers.jl") include("runtime.jl") include("api.jl") -include("client.jl") +include("client.jl") include("io.jl") @precompile_all_calls begin s = local_server() t = "asdfe" t2 = "edfasdf" - e = Exec(name = t2, exec="srun") - e1 = Environment(t, Dict("-N" => 3, "partition" => "default", "time" => "00:01:01"), Dict("OMP_NUM_THREADS" => 1), "", "", e) + e = Exec(; name = t2, exec = "srun") + e1 = Environment(t, Dict("-N" => 3, "partition" => "default", "time" => "00:01:01"), + Dict("OMP_NUM_THREADS" => 1), "", "", e) save(e1) save(e) e1 = load(e1) diff --git a/src/api.jl b/src/api.jl index 54c8949..d506a69 100644 --- a/src/api.jl +++ b/src/api.jl @@ -16,7 +16,7 @@ get_server_config(req) = local_server() api_ispath(req::HTTP.Request) = ispath(path(req)) api_read(req::HTTP.Request) = read(path(req)) api_write(req::HTTP.Request) = write(path(req), req.body) -api_rm(req::HTTP.Request) = rm(path(req), recursive=true) +api_rm(req::HTTP.Request) = rm(path(req); recursive = true) api_symlink(req::HTTP.Request) = symlink(JSON3.read(req.body, Vector{String})...) api_readdir(req::HTTP.Request) = readdir(path(req)) api_mtime(req::HTTP.Request) = mtime(path(req)) @@ -45,7 +45,7 @@ function setup_core_api!(router::HTTP.Router) HTTP.register!(router, "GET", "/api/**", execute_function) HTTP.register!(router, "POST", "/write/**", api_write) HTTP.register!(router, "POST", "/rm/**", api_rm) - HTTP.register!(router, "POST", "/symlink/", api_symlink) + return HTTP.register!(router, "POST", "/symlink/", api_symlink) end submit_job(req, channel) = put!(channel, path(req)) @@ -55,8 +55,10 @@ function get_job(job_dir::AbstractString, queue::Queue) if info === nothing info = get(queue.info.full_queue, job_dir, nothing) end - - return (info, JSON3.read(read(joinpath(job_dir, ".remotehpc_info")), Tuple{String, Environment, Vector{Calculation}})...) + + return (info, + JSON3.read(read(joinpath(job_dir, ".remotehpc_info")), + Tuple{String,Environment,Vector{Calculation}})...) end function get_jobs(state::JobState, queue::Queue) @@ -83,20 +85,21 @@ function get_jobs(dirfuzzy::AbstractString, queue::Queue) return jobs end -save_job(req::HTTP.Request, args...) = - save_job(path(req), - JSON3.read(req.body, Tuple{String, Environment, Vector{Calculation}}), - args...) +function save_job(req::HTTP.Request, args...) + return save_job(path(req), + JSON3.read(req.body, Tuple{String,Environment,Vector{Calculation}}), + args...) +end -function save_job(dir::AbstractString, job_info::Tuple, queue::Queue, sched::Scheduler) +function save_job(dir::AbstractString, job_info::Tuple, queue::Queue, sched::Scheduler) # Needs to be done so the inputs `dir` also changes. mkpath(dir) open(joinpath(dir, "job.sh"), "w") do f - write(f, job_info, sched) + return write(f, job_info, sched) end - JSON3.write(joinpath(dir, ".remotehpc_info"), job_info) + JSON3.write(joinpath(dir, ".remotehpc_info"), job_info) lock(queue) do q - q.full_queue[dir] = Job(-1, Saved) + return q.full_queue[dir] = Job(-1, Saved) end end @@ -110,21 +113,23 @@ function abort(req::HTTP.Request, queue::Queue, sched::Scheduler) lock(queue) do q j = pop!(q.current_queue, jdir) j.state = Cancelled - q.full_queue[jdir] = j + return q.full_queue[jdir] = j end - + return j.id end -function setup_job_api!(router::HTTP.Router, submit_channel, queue::Queue, scheduler::Scheduler) +function setup_job_api!(router::HTTP.Router, submit_channel, queue::Queue, + scheduler::Scheduler) HTTP.register!(router, "POST", "/job/**", (req) -> save_job(req, queue, scheduler)) HTTP.register!(router, "PUT", "/job/**", (req) -> submit_job(req, submit_channel)) HTTP.register!(router, "GET", "/job/**", (req) -> get_job(path(req), queue)) HTTP.register!(router, "GET", "/jobs/state", - (req) -> get_jobs(JSON3.read(req.body, JobState), queue)) + (req) -> get_jobs(JSON3.read(req.body, JobState), queue)) HTTP.register!(router, "GET", "/jobs/fuzzy", - (req) -> get_jobs(JSON3.read(req.body, String), queue)) - HTTP.register!(router, "POST", "/abort/**", (req) -> abort(req, queue, scheduler)) + (req) -> get_jobs(JSON3.read(req.body, String), queue)) + return HTTP.register!(router, "POST", "/abort/**", + (req) -> abort(req, queue, scheduler)) end function load(req::HTTP.Request) @@ -137,13 +142,13 @@ function load(req::HTTP.Request) try return load(val) catch - return map(x->storage_name(x), replacements(val)) + return map(x -> storage_name(x), replacements(val)) end else - cpath = config_path(p) + cpath = config_path(p) if isempty(splitext(p)[end]) # Here we return the possibilities - return map(x->splitext(x)[1], readdir(cpath)) + return map(x -> splitext(x)[1], readdir(cpath)) else return read(cpath, String) end @@ -165,7 +170,7 @@ end function database_rm(req) p = config_path(path(req)) ispath(p) - rm(p) + return rm(p) end function name(req) @@ -178,6 +183,5 @@ function setup_database_api!(router) HTTP.register!(router, "GET", "/database/storage/**", load) HTTP.register!(router, "POST", "/database/storage/**", save) HTTP.register!(router, "PUT", "/database/storage/**", database_rm) - HTTP.register!(router, "GET", "/database/name", name) + return HTTP.register!(router, "GET", "/database/name", name) end - diff --git a/src/client.jl b/src/client.jl index f50d602..4935a02 100644 --- a/src/client.jl +++ b/src/client.jl @@ -1,6 +1,8 @@ function HTTP.request(method::String, s::Server, url, body; kwargs...) - header = ["Type" => replace("$(typeof(body))", "RemoteHPC."=>""), "USER-UUID" => s.uuid] - return HTTP.request(method, string(http_string(s), url), header, JSON3.write(body); kwargs...) + header = ["Type" => replace("$(typeof(body))", "RemoteHPC." => ""), + "USER-UUID" => s.uuid] + return HTTP.request(method, string(http_string(s), url), header, JSON3.write(body); + kwargs...) end function HTTP.request(method::String, s::Server, url, body::Vector{UInt8}; kwargs...) @@ -8,10 +10,12 @@ function HTTP.request(method::String, s::Server, url, body::Vector{UInt8}; kwarg return HTTP.request(method, string(http_string(s), url), header, body; kwargs...) end -function HTTP.request(method::String, s::Server, url; connect_timeout=1, retries=2, kwargs...) +function HTTP.request(method::String, s::Server, url; connect_timeout = 1, retries = 2, + kwargs...) header = ["USER-UUID" => s.uuid] - - return HTTP.request(method, string(http_string(s), url), header; connect_timeout=connect_timeout, retries=retries, kwargs...) + + return HTTP.request(method, string(http_string(s), url), header; + connect_timeout = connect_timeout, retries = retries, kwargs...) end for f in (:get, :put, :post, :head, :patch) @@ -38,7 +42,7 @@ function start(s::Server) alive = isalive(s) end @assert !alive "Server is already up and running." - + @info "Starting:\n$s" hostname = gethostname(s) if islocal(s) @@ -47,25 +51,24 @@ function start(s::Server) cmd = "cat ~/.julia/config/RemoteHPC/$hostname/self_destruct" t = server_command(s, cmd).exitcode == 0 end - + @assert !t "Self destruction was previously triggered, signalling issues on the Server.\nPlease investigate and if safe, remove ~/.julia/config/RemoteHPC/self_destruct" - # Here we clean up previous connections and commands if !islocal(s) if s.local_port != 0 destroy_tunnel(s) end - + t = deepcopy(s) t.domain = "localhost" t.local_port = 0 t.name = hostname tf = tempname() - JSON3.write(tf, t) + JSON3.write(tf, t) push(tf, s, "~/.julia/config/RemoteHPC/$hostname/storage/servers/$hostname.json") end - + # Here we check what the modify time of the server-side localhost file is. # The server will rewrite the file with the correct port, which we use to see # whether the server started succesfully. @@ -84,19 +87,21 @@ function start(s::Server) p = "~/.julia/config/RemoteHPC/$hostname/logs/errors.log" scrpt = "using RemoteHPC; RemoteHPC.julia_main()" if s.domain != "localhost" - julia_cmd = replace("""$(s.julia_exec) --startup-file=no -t 10 -e "using RemoteHPC; RemoteHPC.julia_main()" &> $p""", "'" => "") - run(Cmd(`ssh -f $(ssh_string(s)) $julia_cmd`, detach=true)) + julia_cmd = replace("""$(s.julia_exec) --startup-file=no -t 10 -e "using RemoteHPC; RemoteHPC.julia_main()" &> $p""", + "'" => "") + run(Cmd(`ssh -f $(ssh_string(s)) $julia_cmd`; detach = true)) else e = s.julia_exec - julia_cmd = Cmd([string.(split(e))..., "--startup-file=no", "-t", "auto", "-e", scrpt, "&>", p, "&"]) - run(Cmd(julia_cmd, detach=true), wait=false) + julia_cmd = Cmd([string.(split(e))..., "--startup-file=no", "-t", "auto", "-e", + scrpt, "&>", p, "&"]) + run(Cmd(julia_cmd; detach = true); wait = false) end - + #TODO: little hack here retries = 0 - prog = ProgressUnknown( "Waiting for server bootup:", spinner=true) + prog = ProgressUnknown("Waiting for server bootup:"; spinner = true) while checktime() <= firstime && retries < 60 - ProgressMeter.next!(prog; spinner = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏", showvalues=[(:try, retries)]) + ProgressMeter.next!(prog; spinner = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏", showvalues = [(:try, retries)]) retries += 1 sleep(1) end @@ -104,7 +109,6 @@ function start(s::Server) if retries == 60 error("Something went wrong starting the server.") else - tserver = load_config(s) s.port = tserver.port if s.local_port == 0 @@ -144,12 +148,12 @@ end function update_config(s::Server) alive = isalive(s) - if alive + if alive @info "Server is alive, killing" kill(s) end save(s) - start(s) + return start(s) end """ @@ -160,7 +164,7 @@ the return is `false`. """ function isalive(s::Server) alive = try - HTTP.get(s, "/isalive", connect_timeout=2, retries=2) !== nothing + HTTP.get(s, "/isalive"; connect_timeout = 2, retries = 2) !== nothing catch false end @@ -171,7 +175,7 @@ function isalive(s::Server) finally construct_tunnel(s) try - return HTTP.get(s, "/isalive", connect_timeout=2, retries=2) !== nothing + return HTTP.get(s, "/isalive"; connect_timeout = 2, retries = 2) !== nothing catch return false end @@ -180,7 +184,8 @@ function isalive(s::Server) return false end -function save(s::Server, dir::AbstractString, e::Environment, calcs::Vector{Calculation}; name = "RemoteHPC_job") +function save(s::Server, dir::AbstractString, e::Environment, calcs::Vector{Calculation}; + name = "RemoteHPC_job") adir = abspath(s, dir) HTTP.post(s, "/job/" * adir, (name, e, calcs)) return adir @@ -193,8 +198,10 @@ function load(s::Server, dir::AbstractString) return JSON3.read(resp.body, Vector{String}) else resp = HTTP.get(s, "/job/" * adir) - info, name, environment, calculations = JSON3.read(resp.body, Tuple{Job, String, Environment, Vector{Calculation}}) - return (;info, name, environment, calculations) + info, name, environment, calculations = JSON3.read(resp.body, + Tuple{Job,String,Environment, + Vector{Calculation}}) + return (; info, name, environment, calculations) end end function load(s::Server, state::JobState) @@ -204,9 +211,10 @@ end function submit(s::Server, dir::AbstractString) adir = abspath(s, dir) - HTTP.put(s, "/job/" * adir) + return HTTP.put(s, "/job/" * adir) end -function submit(s::Server, dir::AbstractString, e::Environment, calcs::Vector{Calculation}; kwargs...) +function submit(s::Server, dir::AbstractString, e::Environment, calcs::Vector{Calculation}; + kwargs...) adir = save(s, dir, e, calcs; kwargs...) submit(s, adir) return adir diff --git a/src/database.jl b/src/database.jl index 407bf94..6f66ea9 100644 --- a/src/database.jl +++ b/src/database.jl @@ -1,9 +1,15 @@ abstract type Storable end -storage_name(s::S) where {S<:Storable} = hasfield(S, :name) ? s.name : error("Please define a name function for type $S.") -storage_directory(s::S) where {S<:Storable} = error("Please define a storage_directory function for type $S.") +function storage_name(s::S) where {S<:Storable} + return hasfield(S, :name) ? s.name : error("Please define a name function for type $S.") +end +function storage_directory(s::S) where {S<:Storable} + return error("Please define a storage_directory function for type $S.") +end -storage_path(s::S) where {S<:Storable} = joinpath("storage", storage_directory(s), storage_name(s) * ".json") +function storage_path(s::S) where {S<:Storable} + return joinpath("storage", storage_directory(s), storage_name(s) * ".json") +end storage_url(s::Storable) = joinpath("/database", storage_path(s)) verify(s::Storable) = nothing @@ -23,11 +29,11 @@ function save(s::Storable) @warn "Overwriting previously existing item at $p." end verify(s) - JSON3.write(p, s) + return JSON3.write(p, s) end function save(server, s::Storable) url = storage_url(s) - HTTP.post(server, url, s) + return HTTP.post(server, url, s) end """ @@ -37,20 +43,20 @@ end Loads a previously stored item from the `server`. If `server` is not specified the local item is loaded. """ -function load(s::S) where {S <: Storable} +function load(s::S) where {S<:Storable} p = config_path(storage_path(s)) if !ispath(p) error("No item found at $p.") end - JSON3.read(read(p, String), S) + return JSON3.read(read(p, String), S) end -function load(server, s::S) where {S <: Storable} +function load(server, s::S) where {S<:Storable} url = storage_url(s) if isempty(s.name) if s == S() # asking for all possibilities return JSON3.read(HTTP.get(server, splitdir(url)[1]).body, Vector{String}) else - n = name(server, s) + n = name(server, s) if n !== nothing # stored item with matching data existed, just replace name s.name = n return s @@ -61,7 +67,7 @@ function load(server, s::S) where {S <: Storable} end elseif exists(server, s) # asking for a stored item return JSON3.read(JSON3.read(HTTP.get(server, url).body, String), S) - + else @warn "No exact match found. Returning the closest options..." return JSON3.read(HTTP.get(server, url, s).body, Vector{String}) @@ -97,7 +103,7 @@ function replacements(s::S) where {S<:Storable} end end end - p = sortperm(score, rev=true) + p = sortperm(score; rev = true) best = maximum(score) return all[p][1:length(findall(isequal(best), score))] end @@ -105,7 +111,7 @@ end function name(s::S) where {S<:Storable} dir = config_path("storage", storage_directory(s)) for f in readdir(dir) - t = JSON3.read(read(joinpath(dir, f), String), S) + t = JSON3.read(read(joinpath(dir, f), String), S) if t == s return t.name end @@ -132,10 +138,10 @@ function Base.rm(s::Storable) if !ispath(p) error("No item found at $p.") end - rm(p) + return rm(p) end - + function Base.rm(server, s::Storable) - url = storage_url(s) - HTTP.put(server, url) + url = storage_url(s) + return HTTP.put(server, url) end diff --git a/src/io.jl b/src/io.jl index 45c5bc0..65cd07f 100644 --- a/src/io.jl +++ b/src/io.jl @@ -1,6 +1,6 @@ function flagstring(f, v) out = "" - if f[1]!='-' + if f[1] != '-' if length(f) == 1 out *= "-$f " else @@ -15,7 +15,7 @@ function flagstring(f, v) end if !(v isa AbstractString) && length(v) > 1 for v_ in v - out *="$v_ " + out *= "$v_ " end else out *= "$v " @@ -33,7 +33,7 @@ end function Base.write(io::IO, c::Calculation) write(io, c.exec) - write(io, " $(c.args)") + return write(io, " $(c.args)") end function write_exports_string(io::IO, e::Environment) @@ -43,24 +43,24 @@ function write_exports_string(io::IO, e::Environment) end function write_preamble_string(io::IO, e::Environment, sched::Scheduler) - for (f, v) in e.directives + for (f, v) in e.directives write(io, "$(directive_prefix(sched)) $(flagstring(f, v))\n") end - write(io, "$(e.preamble)\n") + return write(io, "$(e.preamble)\n") end write_postamble_string(io::IO, e::Environment) = write(io, "$(e.postamble)\n") function Base.write(io::IO, job_info::Tuple, sched::Scheduler) name, e, calcs = job_info - + write(io, "#!/bin/bash\n") write(io, "# Generated by RemoteHPC\n") write(io, "$(name_directive(sched)) $name\n") - + write_preamble_string(io, e, sched) write_exports_string(io, e) - + modules = String[] for c in calcs for m in c.exec.modules @@ -72,11 +72,11 @@ function Base.write(io::IO, job_info::Tuple, sched::Scheduler) if !isempty(modules) write(io, "module load $(join(modules, " "))\n") end - + for c in calcs #TODO: Put all of this in write(io, c) write(io, c.run ? "" : "#") - if c.exec.parallel + if c.exec.parallel write(io, e.parallel_exec) names = "$(e.parallel_exec.name) $(c.exec.name)" else @@ -85,6 +85,6 @@ function Base.write(io::IO, job_info::Tuple, sched::Scheduler) write(io, c) write(io, "\n") end - - write_postamble_string(io, e) + + return write_postamble_string(io, e) end diff --git a/src/runtime.jl b/src/runtime.jl index 43afa4b..978890c 100644 --- a/src/runtime.jl +++ b/src/runtime.jl @@ -4,8 +4,8 @@ mutable struct Job end Base.@kwdef mutable struct QueueInfo - full_queue::Dict{String, Job} = Dict{String, Job}() - current_queue::Dict{String, Job} = Dict{String, Job}() + full_queue::Dict{String,Job} = Dict{String,Job}() + current_queue::Dict{String,Job} = Dict{String,Job}() submit_queue::Vector{String} = String[] end StructTypes.StructType(::Type{QueueInfo}) = StructTypes.Mutable() @@ -37,7 +37,7 @@ function Base.fill!(qu::Queue, s::Scheduler, init) lock(qu) do q copy!(q.full_queue, tq.full_queue) copy!(q.current_queue, tq.current_queue) - copy!(q.submit_queue, tq.submit_queue) + return copy!(q.submit_queue, tq.submit_queue) end end end @@ -64,7 +64,7 @@ function Base.fill!(qu::Queue, s::Scheduler, init) end else squeue = queue(s) - lock(qu) do q + lock(qu) do q for (d, i) in q.current_queue if haskey(squeue, d) state = pop!(squeue, d)[2] @@ -82,17 +82,19 @@ function Base.fill!(qu::Queue, s::Scheduler, init) for (k, v) in squeue q.current_queue[k] = Job(v...) end - end + end end return qu end function main_loop(s::Server, submit_channel, queue, main_loop_stop) fill!(queue, s.scheduler, true) - @info (timestamp=string(Dates.now()), username = get(ENV,"USER","nouser"), host = gethostname(), pid=getpid()) + @info (timestamp = string(Dates.now()), username = get(ENV, "USER", "nouser"), + host = gethostname(), pid = getpid()) # Used to identify if multiple servers are running in order to selfdestruct - log_mtimes = mtime.(joinpath.((config_path("logs/runtimes/"),), readdir(config_path("logs/runtimes/")))) + log_mtimes = mtime.(joinpath.((config_path("logs/runtimes/"),), + readdir(config_path("logs/runtimes/")))) t = Threads.@spawn while !main_loop_stop[] try fill!(queue, s.scheduler, false) @@ -113,19 +115,20 @@ function main_loop(s::Server, submit_channel, queue, main_loop_stop) Threads.@spawn while !main_loop_stop[] monitor_issues(log_mtimes) - try + try print_log(queue) catch @error "Logging error:" e stacktrace(catch_backtrace()) end if ispath(config_path("self_destruct")) - @info (timestamp = Dates.now(), message = "self_destruct found, self destructing...") + @info (timestamp = Dates.now(), + message = "self_destruct found, self destructing...") exit() end sleep(5) end fetch(t) - JSON3.write(config_path("jobs", "queue.json"), queue.info) + return JSON3.write(config_path("jobs", "queue.json"), queue.info) end function print_log(queue) @@ -133,7 +136,8 @@ function print_log(queue) end function monitor_issues(log_mtimes) - new_mtimes = mtime.(joinpath.((config_path("logs/runtimes"),), readdir(config_path("logs/runtimes")))) + new_mtimes = mtime.(joinpath.((config_path("logs/runtimes"),), + readdir(config_path("logs/runtimes")))) if length(new_mtimes) != length(log_mtimes) @error "More Server logs got created signalling a server was started while a previous was running." touch(config_path("self_destruct")) @@ -146,11 +150,11 @@ function monitor_issues(log_mtimes) daemon_log = config_path("logs/daemon/restapi.log") if filesize(daemon_log) > 1e9 open(daemon_log, "w") do f - write(f, "") + return write(f, "") end end end - + # Jobs are submitted by the daemon, using supplied job jld2 from the caller (i.e. another machine) # Additional files are packaged with the job function handle_job_submission!(queue, s::Server, submit_channel) @@ -169,7 +173,7 @@ function handle_job_submission!(queue, s::Server, submit_channel) id = submit(s.scheduler, j) @info (string(Dates.now()), j, id, Pending) lock(queue) do q - q.current_queue[j] = Job(id, Pending) + return q.current_queue[j] = Job(id, Pending) end curtries = -1 catch e @@ -185,7 +189,7 @@ function handle_job_submission!(queue, s::Server, submit_channel) @warn "Submission job at dir: $j is not a directory." end end - deleteat!(lines, 1:n_submit) + return deleteat!(lines, 1:n_submit) end function server_logger() @@ -198,12 +202,12 @@ end function restapi_logger() p = config_path("logs/daemon") mkpath(p) - FileLogger(joinpath(p, "restapi.log"); append = false) + return FileLogger(joinpath(p, "restapi.log"); append = false) end function job_logger(id::Int) p = config_path("logs/jobs") mkpath(p) - FileLogger(joinpath(p, "$id.log")) + return FileLogger(joinpath(p, "$id.log")) end function requestHandler(handler) @@ -230,7 +234,8 @@ function requestHandler(handler) end stop = Dates.now() @info (timestamp = string(stop), event = "End", tid = Threads.threadid(), - method = req.method, target = req.target, duration = Dates.value(stop - start), + method = req.method, target = req.target, + duration = Dates.value(stop - start), status = resp.status, bodysize = length(resp.body)) return resp end @@ -242,14 +247,14 @@ function AuthHandler(handler, user_uuid::UUID) uuid = HTTP.header(req, "USER-UUID") if UUID(uuid) == user_uuid t = ThreadPools.spawnbg() do - handler(req) + return handler(req) end return fetch(t) end end return HTTP.Response(401, "unauthorized") end -end +end function julia_main()::Cint # initialize_config_dir() @@ -257,17 +262,16 @@ function julia_main()::Cint port, server = listenany(ip"0.0.0.0", 8080) s.port = port user_uuid = UUID(s.uuid) - + router = HTTP.Router() submit_channel = Channel{String}(Inf) job_queue = Queue() setup_core_api!(router) setup_job_api!(router, submit_channel, job_queue, s.scheduler) setup_database_api!(router) - - + should_stop = Ref(false) - HTTP.register!(router, "GET", "/server/config", (req) -> s) + HTTP.register!(router, "GET", "/server/config", (req) -> s) t = @tspawnat min(Threads.nthreads(), 2) with_logger(server_logger()) do try @@ -277,12 +281,15 @@ function julia_main()::Cint rethrow(e) end end - HTTP.register!(router, "PUT", "/server/kill", (req) -> (should_stop[]=true; fetch(t); true)) + HTTP.register!(router, "PUT", "/server/kill", + (req) -> (should_stop[] = true; fetch(t); true)) save(s) with_logger(restapi_logger()) do - @info (timestamp = string(Dates.now()), username = ENV["USER"], host = gethostname(), pid=getpid(), port=port) - - @async HTTP.serve(router |> requestHandler |> x -> AuthHandler(x, user_uuid), "0.0.0.0", port, server=server) + @info (timestamp = string(Dates.now()), username = ENV["USER"], + host = gethostname(), pid = getpid(), port = port) + + @async HTTP.serve(router |> requestHandler |> x -> AuthHandler(x, user_uuid), + "0.0.0.0", port, server = server) while !should_stop[] sleep(1) end @@ -290,4 +297,3 @@ function julia_main()::Cint close(server) return 0 end - diff --git a/src/schedulers.jl b/src/schedulers.jl index cae580b..d988535 100644 --- a/src/schedulers.jl +++ b/src/schedulers.jl @@ -1,7 +1,7 @@ abstract type Scheduler end @kwdef struct Bash <: Scheduler - type::String="bash" + type::String = "bash" end @kwdef struct Slurm <: Scheduler @@ -9,31 +9,33 @@ end end @kwdef struct HQ <: Scheduler - type::String="hq" - server_command::String="hq" + type::String = "hq" + server_command::String = "hq" allocs::Vector{String} = String[] end StructTypes.StructType(::Type{Cmd}) = StructTypes.Struct() StructTypes.StructType(::Type{Scheduler}) = StructTypes.AbstractType() -StructTypes.subtypes(::Type{Scheduler}) = (bash = Bash, slurm = Slurm, hq=HQ) +StructTypes.subtypes(::Type{Scheduler}) = (bash = Bash, slurm = Slurm, hq = HQ) -StructTypes.StructType(::Type{Bash}) = StructTypes.Struct() +StructTypes.StructType(::Type{Bash}) = StructTypes.Struct() StructTypes.StructType(::Type{Slurm}) = StructTypes.Struct() -StructTypes.StructType(::Type{HQ}) = StructTypes.Struct() +StructTypes.StructType(::Type{HQ}) = StructTypes.Struct() StructTypes.subtypekey(::Type{Scheduler}) = :type -submit(::S, ::AbstractString) where {S<:Scheduler} = error("No submit method defined for $S.") -abort(::S, ::Int) where {S<:Scheduler} = error("No abort method defined for $S.") -jobstate(::S, ::Any) where {S<:Scheduler} = error("No jobstate method defined for $S.") +function submit(::S, ::AbstractString) where {S<:Scheduler} + return error("No submit method defined for $S.") +end +abort(::S, ::Int) where {S<:Scheduler} = error("No abort method defined for $S.") +jobstate(::S, ::Any) where {S<:Scheduler} = error("No jobstate method defined for $S.") submit_cmd(s::S) where {S<:Scheduler} = error("No submit_cmd method defined for $S.") submit_cmd(s::Slurm) = "sbatch" -submit_cmd(s::Bash) = "bash" -submit_cmd(s::HQ) = "hq" +submit_cmd(s::Bash) = "bash" +submit_cmd(s::HQ) = "hq" function is_reachable(server_command::String) - t = run(string2cmd("which $server_command"), wait=false) + t = run(string2cmd("which $server_command"); wait = false) while !process_exited(t) sleep(0.005) end @@ -54,13 +56,13 @@ name_directive(::HQ) = "#HQ --name" function parse_params(sched::Scheduler, preamble::String) m = match(r"$(directive_prefix(sched)) (.+)\n", preamble) params = Dict() - last = 0 + last = 0 while m !== nothing - s = split(replace(replace(m.captures[1], "-"=> ""), "=" => " ")) + s = split(replace(replace(m.captures[1], "-" => ""), "=" => " ")) tp = Meta.parse(s[2]) t = tp isa Symbol || tp isa Expr ? s[2] : tp params[s[1]] = t - last = m.offset + length(m.match) + last = m.offset + length(m.match) m = match(r"$(directive_prefix(sched)) (.+)\n", preamble, last) end return params, preamble[last:end] @@ -72,14 +74,15 @@ queue(::Bash) = Dict() function jobstate(::Bash, id::Int) out = Pipe() err = Pipe() - p = run(pipeline(ignorestatus(`ps -p $id`), stderr = err, stdout=out)) + p = run(pipeline(ignorestatus(`ps -p $id`); stderr = err, stdout = out)) close(out.in) close(err.in) return p.exitcode == 1 ? Completed : Running end -submit(::Bash, j::AbstractString) = - Int(getpid(run(Cmd(`bash job.sh`, detach=true, dir=j), wait=false))) +function submit(::Bash, j::AbstractString) + return Int(getpid(run(Cmd(`bash job.sh`; detach = true, dir = j); wait = false))) +end function abort(::Bash, id::Int) pids = [parse(Int, split(s)[1]) for s in readlines(`ps -s $id`)[2:end]] @@ -88,9 +91,10 @@ function abort(::Bash, id::Int) end end -in_queue(s::JobState) = - s in (Submitted, Pending, Running, Configuring, Completing, Suspended) - +function in_queue(s::JobState) + return s in (Submitted, Pending, Running, Configuring, Completing, Suspended) +end + ## SLURM ## function maybe_scheduler_restart(::Slurm) if occursin("error", read(`squeue -u $(ENV["USER"])`, String)) @@ -106,7 +110,8 @@ end function queue(sc::Slurm) qlines = readlines(`squeue -u $(ENV["USER"]) --format="%Z %i %T"`)[2:end] - return Dict([(s = split(x); s[1] => (parse(Int, s[2]), jobstate(sc, s[3]))) for x in qlines]) + return Dict([(s = split(x); s[1] => (parse(Int, s[2]), jobstate(sc, s[3]))) + for x in qlines]) end function jobstate(s::Slurm, id::Int) @@ -121,7 +126,7 @@ function jobstate(s::Slurm, id::Int) nothing end st != Unknown && return st - + cmd = `scontrol show job $id` try lines = read(cmd, String) @@ -172,25 +177,27 @@ function jobstate(::Slurm, state::AbstractString) return Unknown end -submit(::Slurm, j::AbstractString) = - parse(Int, split(read(Cmd(`sbatch job.sh`, dir=j), String))[end]) +function submit(::Slurm, j::AbstractString) + return parse(Int, split(read(Cmd(`sbatch job.sh`; dir = j), String))[end]) +end -abort(::Slurm, id::Int) = - run(`scancel $id`) +abort(::Slurm, id::Int) = run(`scancel $id`) #### HQ function maybe_scheduler_restart(sc::HQ) function readinfo() out = Pipe() err = Pipe() - run(pipeline(Cmd(Cmd(string.([split(sc.server_command)..., "server", "info"])), ignorestatus=true), stdout=out, stderr=err)) + run(pipeline(Cmd(Cmd(string.([split(sc.server_command)..., "server", "info"])); + ignorestatus = true); stdout = out, stderr = err)) close(out.in) close(err.in) return read(err, String) end - + if occursin("No online", readinfo()) - run(Cmd(Cmd(string.([split(sc.server_command)..., "server", "start"])), detach=true), wait=false) + run(Cmd(Cmd(string.([split(sc.server_command)..., "server", "start"])); + detach = true); wait = false) sleep(0.01) tries = 0 while occursin("No online", readinfo()) && tries < 10 @@ -214,15 +221,17 @@ function maybe_restart_allocs(sc::HQ) if length(alloc_lines) == 3 # no allocs -> add all allocs_to_add = sc.allocs else - alloc_args = map(a -> replace(strip(split(a, "|")[end-1]), "," => " "), alloc_lines[4:end-1]) - allocs_to_add = filter(a -> !any(x -> x == strip(split(a, "-- ")[end]), alloc_args), sc.allocs) + alloc_args = map(a -> replace(strip(split(a, "|")[end-1]), "," => " "), + alloc_lines[4:end-1]) + allocs_to_add = filter(a -> !any(x -> x == strip(split(a, "-- ")[end]), alloc_args), + sc.allocs) end - + for ac in allocs_to_add run(Cmd(string.([split(sc.server_command)..., "alloc", "add", split(ac)...]))) end end - + function queue(sc::HQ) all_lines = readlines(Cmd(string.([split(sc.server_command)..., "job", "list"]))) @@ -235,21 +244,25 @@ function queue(sc::HQ) jobinfos = [(s = split(x); (parse(Int, s[2]), jobstate(sc, s[6]))) for x in qlines] - workdir_line_id = - findfirst(x-> occursin("Working directory", x), readlines(Cmd(string.([split(sc.server_command)..., "job", "info", "$(jobinfos[1][1])"])))) + workdir_line_id = findfirst(x -> occursin("Working directory", x), + readlines(Cmd(string.([split(sc.server_command)..., "job", + "info", "$(jobinfos[1][1])"])))) + + function workdir(id) + return split(readlines(Cmd(string.([split(sc.server_command)..., "job", "info", + "$id"])))[workdir_line_id])[end-1] + end - workdir(id) = split(readlines(Cmd(string.([split(sc.server_command)..., "job", "info", "$id"])))[workdir_line_id])[end-1] - return Dict([workdir(x[1]) => x for x in jobinfos]) end function jobstate(s::HQ, id::Int) lines = readlines(Cmd(string.([split(s.server_command)..., "job", "info", "$id"]))) - + if length(lines) <= 1 return Unknown end - return jobstate(s, split(lines[findfirst(x->occursin("State", x), lines)])[4]) + return jobstate(s, split(lines[findfirst(x -> occursin("State", x), lines)])[4]) end function jobstate(::HQ, state::AbstractString) @@ -270,12 +283,14 @@ end function submit(h::HQ, j::AbstractString) chmod(joinpath(j, "job.sh"), 0o777) - out = read(Cmd(Cmd(string.([split(h.server_command)..., "submit", "./job.sh"])), dir=j), String) + out = read(Cmd(Cmd(string.([split(h.server_command)..., "submit", "./job.sh"])); + dir = j), String) if !occursin("successfully", out) error("Submission error for job in dir $j.") end return parse(Int, split(out)[end]) end -abort(h::HQ, id::Int) = - run(Cmd(string.([split(h.server_command)..., "job", "cancel", "$id"]))) +function abort(h::HQ, id::Int) + return run(Cmd(string.([split(h.server_command)..., "job", "cancel", "$id"]))) +end diff --git a/src/servers.jl b/src/servers.jl index e5dbf3b..4aa0134 100644 --- a/src/servers.jl +++ b/src/servers.jl @@ -31,21 +31,20 @@ end storage_directory(::Server) = "servers" -function configure_scheduler(s::Server; interactive=true) +function configure_scheduler(s::Server; interactive = true) scheduler = nothing if haskey(ENV, "DFC_SCHEDULER") - sched = ENV["DFC_SCHEDULER"] if occursin("hq", lowercase(sched)) cmd = get(ENV, "DFC_SCHEDULER_CMD", "hq") - return HQ(server_command=cmd) + return HQ(; server_command = cmd) elseif lowercase(sched) == "slurm" return Slurm() else error("Scheduler $sched not recognized please set a different DFC_SCHEDULER environment var.") end end - + for t in (HQ(), Slurm()) scmd = submit_cmd(t) if server_command(s, "which $scmd").exitcode == 0 @@ -57,12 +56,13 @@ function configure_scheduler(s::Server; interactive=true) return scheduler end if interactive && scheduler === nothing - choice = request("Couldn't identify the scheduler select one: ", RadioMenu(["SLURM", "HQ", "BASH"])) + choice = request("Couldn't identify the scheduler select one: ", + RadioMenu(["SLURM", "HQ", "BASH"])) if choice == 1 scheduler = Slurm() elseif choice == 2 - scheduler = HQ(server_command = ask_input(String, "HQ command", "hq")) + scheduler = HQ(; server_command = ask_input(String, "HQ command", "hq")) elseif choice == 3 scheduler = Bash() else @@ -73,10 +73,10 @@ function configure_scheduler(s::Server; interactive=true) return Bash() end end - -function configure!(s::Server; interactive=true) + +function configure!(s::Server; interactive = true) if interactive - s.port = ask_input(Int, "Port", s.port) + s.port = ask_input(Int, "Port", s.port) end if s.domain == "localhost" julia = joinpath(Sys.BINDIR, "julia") @@ -93,18 +93,19 @@ function configure!(s::Server; interactive=true) s.julia_exec = julia # Try auto configuring the scheduler - scheduler = configure_scheduler(s; interactive=interactive) + scheduler = configure_scheduler(s; interactive = interactive) if scheduler === nothing return end - s.scheduler = scheduler + s.scheduler = scheduler hdir = server_command(s, "pwd").stdout[1:end-1] if interactive dir = ask_input(String, "Default Jobs directory", hdir) if dir != hdir while server_command(s, "ls $dir").exitcode != 0 # @warn "$dir, no such file or directory." - local_choice = request("No such directory, creating one?", RadioMenu(["yes", "no"])) + local_choice = request("No such directory, creating one?", + RadioMenu(["yes", "no"])) if local_choice == 1 result = server_command(s, "mkdir -p $dir") if result.exitcode != 0 @@ -115,7 +116,7 @@ function configure!(s::Server; interactive=true) end end end - + s.root_jobdir = dir s.max_concurrent_jobs = ask_input(Int, "Max Concurrent Jobs", s.max_concurrent_jobs) else @@ -141,12 +142,12 @@ end Runs through interactive configuration of the local [`Server`](@ref). """ -function configure_local(;interactive=true) +function configure_local(; interactive = true) host = gethostname() - @assert !exists(Server(name=host)) "Local server already configured." + @assert !exists(Server(; name = host)) "Local server already configured." user = get(ENV, "USER", "nouser") - s = Server(name=host, username=user, domain="localhost") - configure!(s; interactive=interactive) + s = Server(; name = host, username = user, domain = "localhost") + configure!(s; interactive = interactive) @info "saving server configuration...", s save(s) @@ -161,7 +162,7 @@ function configure_local(;interactive=true) end function Server(s::String) - t = Server(name=s) + t = Server(; name = s) if exists(t) return load(t) end @@ -170,11 +171,11 @@ function Server(s::String) if occursin("@", s) username, domain = split(s, "@") name = ask_input(String, "Please specify the Server's identifying name") - if exists(Server(name=name, username=username, domain=domain)) + if exists(Server(; name = name, username = username, domain = domain)) @warn "A server with $name was already configured and will be overwritten." end elseif s == "localhost" - username = get(ENV,"USER","nouser") + username = get(ENV, "USER", "nouser") domain = "localhost" name = s else @@ -188,17 +189,18 @@ function Server(s::String) if server !== nothing server.name = name server.domain = domain - - change_config = request("Found remote server configuration:\n$server\nIs this correct?", RadioMenu(["yes", "no"])) + + change_config = request("Found remote server configuration:\n$server\nIs this correct?", + RadioMenu(["yes", "no"])) change_config == -1 && return if change_config == 2 configure!(server) end configure_local_port!(server) - + else @info "Couldn't pull server configuration, creating new..." - server = Server(name=name, domain=domain, username=username) + server = Server(; name = name, domain = domain, username = username) configure!(server) configure_local_port!(server) end @@ -218,9 +220,10 @@ function install_RemoteHPC(s::Server, julia_exec = nothing) @info "No julia found in PATH, installing it..." t = tempname() mkdir(t) - download("https://julialang-s3.julialang.org/bin/linux/x64/1.8/julia-1.8.2-linux-x86_64.tar.gz", joinpath(t, "julia.tar.gz")) + download("https://julialang-s3.julialang.org/bin/linux/x64/1.8/julia-1.8.2-linux-x86_64.tar.gz", + joinpath(t, "julia.tar.gz")) push(joinpath(t, "julia.tar.gz"), s, "julia-1.8.2-linux-x86_64.tar.gz") - rm(t, recursive=true) + rm(t; recursive = true) res = server_command(s, "tar -xf julia-1.8.2-linux-x86_64.tar.gz") @assert res.exitcode == 0 "Issue unpacking julia executable on cluster, please install julia manually" julia_exec = "~/julia-1.8.2/bin/julia" @@ -234,11 +237,11 @@ function install_RemoteHPC(s::Server, julia_exec = nothing) @assert res.exitcode == 0 "Something went wrong installing RemoteHPC on server, please install manually" @info "RemoteHPC installed on remote cluster, try starting the server with `start(server)`." - s.julia_exec = julia_exec + return s.julia_exec = julia_exec end function update_RemoteHPC(s::Server) - alive = isalive(s) + alive = isalive(s) if alive @info "Server running, killing it first." kill(s) @@ -253,8 +256,10 @@ function update_RemoteHPC(s::Server) end Base.joinpath(s::Server, p...) = joinpath(s.root_jobdir, p...) -Base.ispath(s::Server, p...) = islocal(s) ? ispath(p...) : - JSON3.read(HTTP.get(s, "/ispath/" * joinpath(p...)).body, Bool) +function Base.ispath(s::Server, p...) + return islocal(s) ? ispath(p...) : + JSON3.read(HTTP.get(s, "/ispath/" * joinpath(p...)).body, Bool) +end function Base.symlink(s::Server, p, p2) if islocal(s) @@ -267,13 +272,13 @@ end function Base.rm(s::Server, p::String) if islocal(s) - isdir(p) ? rm(p, recursive=true) : rm(p) + isdir(p) ? rm(p; recursive = true) : rm(p) else HTTP.post(s, "/rm/" * p) return nothing end end -function Base.read(s::Server, path::String, type=nothing) +function Base.read(s::Server, path::String, type = nothing) if islocal(s) return type === nothing ? read(path) : read(path, type) else @@ -297,9 +302,11 @@ read_config(config_file) = parse_config(read(config_file, String)) function load_config(username, domain) hostname = gethostname(username, domain) if domain == "localhost" - return parse_config(read(config_path("storage","servers","$hostname.json"),String)) + return parse_config(read(config_path("storage", "servers", "$hostname.json"), + String)) else - t = server_command(username, domain, "cat ~/.julia/config/RemoteHPC/$hostname/storage/servers/$hostname.json") + t = server_command(username, domain, + "cat ~/.julia/config/RemoteHPC/$hostname/storage/servers/$hostname.json") if t.exitcode != 0 return nothing else @@ -310,23 +317,31 @@ end function load_config(s::Server) if isalive(s) return JSON3.read(HTTP.get(s, "/server/config").body, Server) - else + else return load_config(s.username, s.domain) end end -Base.gethostname(username::AbstractString, domain::AbstractString) = split(server_command(username, domain, "hostname").stdout)[1] +function Base.gethostname(username::AbstractString, domain::AbstractString) + return split(server_command(username, domain, "hostname").stdout)[1] +end Base.gethostname(s::Server) = gethostname(s.username, s.domain) ssh_string(s::Server) = s.username * "@" * s.domain -http_string(s::Server) = s.local_port != 0 ? "http://localhost:$(s.local_port)" : "http://$(s.domain):$(s.port)" +function http_string(s::Server) + return s.local_port != 0 ? "http://localhost:$(s.local_port)" : + "http://$(s.domain):$(s.port)" +end function Base.rm(s::Server) return ispath(joinpath(SERVER_DIR, s.name * ".json")) && rm(joinpath(SERVER_DIR, s.name * ".json")) end -find_tunnel(s) = - getfirst(x->occursin("ssh -N -f -L $(s.local_port)", x), split(read(pipeline(`ps aux` , stdout = `grep $(s.local_port)`), String), "\n")) +function find_tunnel(s) + return getfirst(x -> occursin("ssh -N -f -L $(s.local_port)", x), + split(read(pipeline(`ps aux`; stdout = `grep $(s.local_port)`), String), + "\n")) +end function destroy_tunnel(s) t = find_tunnel(s) @@ -340,10 +355,11 @@ function destroy_tunnel(s) end function construct_tunnel(s) - run(Cmd(`ssh -N -f -L $(s.local_port):localhost:$(s.port) $(ssh_string(s))`, detach=true)) + return run(Cmd(`ssh -N -f -L $(s.local_port):localhost:$(s.port) $(ssh_string(s))`; + detach = true)) end -function ask_input(::Type{T}, message, default=nothing) where {T} +function ask_input(::Type{T}, message, default = nothing) where {T} if default === nothing t = "" print(message * ": ") @@ -363,7 +379,7 @@ function ask_input(::Type{T}, message, default=nothing) where {T} return t end end - + """ pull(server::Server, remote::String, loc::String) @@ -376,7 +392,8 @@ function pull(server::Server, remote::String, loc::String) else out = Pipe() err = Pipe() - run(pipeline(`scp -r $(ssh_string(server) * ":" * remote) $path`, stdout=out, stderr=err)) + run(pipeline(`scp -r $(ssh_string(server) * ":" * remote) $path`; stdout = out, + stderr = err)) close(out.in) close(err.in) stderr = read(err, String) @@ -398,7 +415,8 @@ function push(filename::String, server::Server, server_file::String) else out = Pipe() err = Pipe() - run(pipeline(`scp $filename $(ssh_string(server) * ":" * server_file)`, stdout=out, stderr=err)) + run(pipeline(`scp $filename $(ssh_string(server) * ":" * server_file)`; + stdout = out, stderr = err)) close(out.in) close(err.in) end @@ -409,26 +427,27 @@ function server_command(username, domain, cmd::String) out = Pipe() err = Pipe() if domain == "localhost" - process = run(pipeline(ignorestatus(Cmd(string.(split(cmd)))), stdout=out, stderr=err)) + process = run(pipeline(ignorestatus(Cmd(string.(split(cmd)))); stdout = out, + stderr = err)) else - process = run(pipeline(ignorestatus(Cmd(["ssh", "$(username * "@" * domain)", string.(split(cmd))...])), stdout=out, stderr=err)) + process = run(pipeline(ignorestatus(Cmd(["ssh", "$(username * "@" * domain)", + string.(split(cmd))...])); stdout = out, + stderr = err)) end close(out.in) close(err.in) stdout = read(out, String) stderr = read(err, String) - return ( - stdout = stdout, - stderr = stderr, - exitcode = process.exitcode - ) + return (stdout = stdout, + stderr = stderr, + exitcode = process.exitcode) end - + server_command(s::Server, cmd) = server_command(s.username, s.domain, cmd) function has_modules(s::Server) - try + try server_command(s, "module avail").code == 0 catch false @@ -448,8 +467,7 @@ function Base.readdir(s::Server, dir::String) return JSON3.read(resp.body, Vector{String}) end -Base.abspath(s::Server, p) = - isabspath(p) ? p : joinpath(s, p) +Base.abspath(s::Server, p) = isabspath(p) ? p : joinpath(s, p) function Base.mtime(s::Server, p) if islocal(s) @@ -468,6 +486,3 @@ function Base.filesize(s::Server, p) return JSON3.read(resp.body, Float64) end end - - - diff --git a/src/types.jl b/src/types.jl index e259c31..0097c98 100644 --- a/src/types.jl +++ b/src/types.jl @@ -23,7 +23,7 @@ Will first transform `flags` into a `Vector{ExecFlag}`, and construct the [`Exec modules::Vector{String} = String[] parallel::Bool = true end -Exec(str::String;kwargs...) = Exec(name=str; kwargs...) +Exec(str::String; kwargs...) = Exec(; name = str, kwargs...) Base.:(==)(e1::Exec, e2::Exec) = e1.name == e2.name storage_directory(::Exec) = "execs" StructTypes.StructType(::Exec) = StructTypes.Mutable() @@ -40,7 +40,8 @@ function isrunnable(e::Exec) else cmd = `echo "source /etc/profile && ldd $fullpath"` end - run(pipeline(pipeline(cmd, stdout=ignorestatus(`bash`)), stdout = out, stderr=err)) + run(pipeline(pipeline(cmd; stdout = ignorestatus(`bash`)); stdout = out, + stderr = err)) close(out.in) close(err.in) @@ -68,9 +69,8 @@ StructTypes.StructType(::Calculation) = StructTypes.Mutable() postamble::String = "" parallel_exec::Exec = Exec() end -Environment(name::String; kwargs...) = Environment(name=name; kwargs...) +Environment(name::String; kwargs...) = Environment(; name = name, kwargs...) Base.:(==)(e1::Environment, e2::Environment) = e1.name == e2.name storage_directory(::Environment) = "environments" - StructTypes.StructType(::Environment) = StructTypes.Mutable() diff --git a/test/runtests.jl b/test/runtests.jl index bd73d93..7ded7f0 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,7 +2,7 @@ using Test using RemoteHPC tconfdir = tempname() if ispath(tconfdir) - rm(tconfdir, recursive=true) + rm(tconfdir; recursive = true) end import RemoteHPC: config_path config_path(p...) = joinpath(tconfdir, p...) @@ -13,15 +13,15 @@ paths = ["jobs", "storage/servers", "storage/execs", "storage/environments"] -for p in paths +for p in paths mkpath(config_path(p)) end redirect_stdin(devnull) do redirect_stderr(devnull) do redirect_stdout(devnull) do - RemoteHPC.configure_local(interactive=false) - t = @async RemoteHPC.julia_main() + RemoteHPC.configure_local(; interactive = false) + return t = @async RemoteHPC.julia_main() end end end @@ -55,7 +55,10 @@ for sched in scheds @test st.scheduler == sched end @testset "database" begin - exec = RemoteHPC.Exec("test", "cat", "", Dict("f" => 3, "test" => [1, 2, 3], "test2" => "stringtest", "-nk" => 10), ["intel", "intel-mkl"], true) + exec = RemoteHPC.Exec("test", "cat", "", + Dict("f" => 3, "test" => [1, 2, 3], + "test2" => "stringtest", "-nk" => 10), + ["intel", "intel-mkl"], true) save(s, exec) te = load(s, exec) for f in fieldnames(Exec) @@ -63,9 +66,11 @@ for sched in scheds end exec = RemoteHPC.Exec("test", "cat", "", Dict(), [], false) redirect_stderr(devnull) do - save(s, exec) + return save(s, exec) end - e = Environment("test", Dict("N" => 1, "time" => "00:01:01"), Dict("OMP_NUM_THREADS" => 1), "", "", RemoteHPC.Exec(name = "srun", exec="srun")) + e = Environment("test", Dict("N" => 1, "time" => "00:01:01"), + Dict("OMP_NUM_THREADS" => 1), "", "", + RemoteHPC.Exec(; name = "srun", exec = "srun")) partition = get(ENV, "SLURM_PARTITION", nothing) account = get(ENV, "SLURM_ACCOUNT", nothing) if partition !== nothing @@ -73,7 +78,7 @@ for sched in scheds end if account !== nothing e.directives["account"] = account - end + end save(s, e) te = load(s, e) @@ -81,17 +86,16 @@ for sched in scheds @test getfield(te, f) == getfield(e, f) end - es = load(s, Exec("ca")) @test length(es) == 1 - es = load(s, Exec(dir="")) + es = load(s, Exec(; dir = "")) @test length(es) == 1 - end @testset "job" begin @testset "creation and save" begin exec = load(s, Exec("test")) - c = [Calculation(exec, "< scf.in > scf.out", true), Calculation(exec, "< nscf.in > nscf.out", true)] + c = [Calculation(exec, "< scf.in > scf.out", true), + Calculation(exec, "< nscf.in > nscf.out", true)] e = load(s, Environment("test")) save(s, t_jobdir, e, c; name = "testjob") @test state(s, t_jobdir) == RemoteHPC.Saved @@ -115,11 +119,13 @@ for sched in scheds @test read(joinpath(t_jobdir, "scf.out"), String) == "test input" @test read(joinpath(t_jobdir, "nscf.out"), String) == "test input2" exec = load(s, Exec("test")) - sleep_e = Exec(name="sleep", exec="sleep", parallel=false) - c = [Calculation(exec, "< scf.in > scf.out", true), Calculation(exec, "< nscf.in > nscf.out", true), Calculation(sleep_e, "10", true)] + sleep_e = Exec(; name = "sleep", exec = "sleep", parallel = false) + c = [Calculation(exec, "< scf.in > scf.out", true), + Calculation(exec, "< nscf.in > nscf.out", true), + Calculation(sleep_e, "10", true)] e = load(s, Environment("test")) - - submit(s, t_jobdir, e, c, name="testjob") + + submit(s, t_jobdir, e, c; name = "testjob") while state(s, t_jobdir) != RemoteHPC.Running sleep(0.1) end @@ -127,14 +133,13 @@ for sched in scheds @test state(s, t_jobdir) == RemoteHPC.Cancelled rm(s, t_jobdir) @test !ispath(s, t_jobdir) - end end end end @testset "files api" begin @test length(readdir(s, config_path())) == 3 - @test filesize(s, config_path("logs/daemon/restapi.log")) > 0 + @test filesize(s, config_path("logs/daemon/restapi.log")) > 0 @test mtime(s, config_path("logs/daemon/restapi.log")) > 0 tname = tempname() write(s, tname, "test")