diff --git a/Project.toml b/Project.toml index 5b14a07..4a28507 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "RemoteHPC" uuid = "c4f2a1c4-7655-40d8-82ee-c6ae0a8b7409" authors = ["louisponet "] -version = "0.3.27" +version = "0.3.28" [deps] BinaryTraits = "190e46ec-f771-4705-b939-984896f7be0e" diff --git a/src/client.jl b/src/client.jl index a5e80ae..60d7f55 100644 --- a/src/client.jl +++ b/src/client.jl @@ -4,108 +4,150 @@ Launches the daemon process on the host [`Server`](@ref) `s`. """ function start(s::Server; verbosity=0) - if !islocal(s) && !isalive(LOCAL_SERVER[]) - @warn "Local server not running. Starting that first." - start(LOCAL_SERVER[]) - if !isalive(LOCAL_SERVER[]) - error("Couldn't start local server.") - end - end - alive = isalive(s) - @assert !alive "Server is already up and running." - @debug "Starting:\n$s" - hostname = gethostname(s) - conf_path = config_path(s) - t = server_command(s, "ls $(conf_path)") - if t.exitcode != 0 - error("RemoteHPC not installed on server. Install it using `RemoteHPC.install_RemoteHPC(Server(\"$(s.name)\"))`") - end - - if islocal(s) - t = ispath(config_path("self_destruct")) - else - cmd = "cat $(conf_path)/$hostname/self_destruct" - t = server_command(s, cmd).exitcode == 0 - end - @assert !t "Self destruction was previously triggered, signalling issues on the Server.\nPlease investigate and if safe, remove $(conf_path)/self_destruct" + + title = "Starting Server($(s.name))" + steps = ["Verifying that local server is running", + "Verifying that the server isn't already alive", + "Starting server", + "Waiting for server connection"] + + StepSpinner(title, steps) do spinner + + + if !islocal(s) && !isalive(LOCAL_SERVER[]) + push!(spinner, "Starting local server.") + start(LOCAL_SERVER[]) + if !isalive(LOCAL_SERVER[]) + finish!(spinner, ErrorException("Couldn't start local server.")) + end + end + push!(spinner, "local server running") - if !islocal(s) - t = deepcopy(s) - t.domain = "localhost" - t.name = hostname - tf = tempname() - JSON3.write(tf, t) - push(tf, s, "$(conf_path)/$hostname/storage/servers/$hostname.json") - end + next!(spinner) + + alive = isalive(s) || get(JSON3.read(check_connections(names=[s.name]).body), Symbol(s.name), false) + if alive + push!(spinner, "Server is already up and running.") + finish!(spinner) + return + end - # Here we check what the modify time of the server-side localhost file is. - # The server will rewrite the file with the correct port, which we use to see - # whether the server started succesfully. - function checktime() - curtime = 0 + next!(spinner) + + hostname = gethostname(s) + conf_path = config_path(s) + t = server_command(s, "ls $(conf_path)") + + if t.exitcode != 0 + + finish!(spinner, ErrorException("RemoteHPC not installed on server. Install it using `RemoteHPC.install(Server(\"$(s.name)\"))`")) + + end + if islocal(s) - return mtime(config_path("storage", "servers", "$(hostname).json")) + self_destructed = ispath(config_path("self_destruct")) + else - cmd = "stat -c %Z $(conf_path)/$hostname/storage/servers/$(hostname).json" - return parse(Int, server_command(s.username, s.domain, cmd)[1]) + cmd = "cat $(conf_path)/$hostname/self_destruct" + self_destructed = server_command(s, cmd).exitcode == 0 + end - return curtime - end - firstime = checktime() + + if self_destructed + finish!(spinner, + ErrorException("""Self destruction was previously triggered, signalling issues on the Server. + Please investigate and if safe, remove $(conf_path)/self_destruct""")) + end + + if !islocal(s) + t = deepcopy(s) + t.domain = "localhost" + t.name = hostname + tf = tempname() + JSON3.write(tf, t) + push(tf, s, "$(conf_path)/$hostname/storage/servers/$hostname.json") + end + + # Here we check what the modify time of the server-side localhost file is. + # The server will rewrite the file with the correct port, which we use to see + # whether the server started succesfully. + function checktime() + curtime = 0 + if islocal(s) + return mtime(config_path("storage", "servers", "$(hostname).json")) + else + cmd = "stat -c %Z $(conf_path)/$hostname/storage/servers/$(hostname).json" + return parse(Int, server_command(s.username, s.domain, cmd)[1]) + end + return curtime + end + firstime = checktime() - p = "$(conf_path)/$hostname/logs/errors.log" - scrpt = "using RemoteHPC; RemoteHPC.julia_main(verbose=$(verbosity))" - if s.domain != "localhost" - julia_cmd = replace("""$(s.julia_exec) --project=$(conf_path) --startup-file=no -t 10 -e "using RemoteHPC; RemoteHPC.julia_main(verbose=$(verbosity))" &> $p""", - "'" => "") - if Sys.which("ssh") === nothing - OpenSSH_jll.ssh() do ssh_exec - run(Cmd(`$ssh_exec -f $(ssh_string(s)) $julia_cmd`; detach = true)) + p = "$(conf_path)/$hostname/logs/errors.log" + scrpt = "using RemoteHPC; RemoteHPC.julia_main(verbose=$(verbosity))" + + if s.domain != "localhost" + julia_cmd = replace("""$(s.julia_exec) --project=$(conf_path) --startup-file=no -t 10 -e "using RemoteHPC; RemoteHPC.julia_main(verbose=$(verbosity))" &> $p""", + "'" => "") + if Sys.which("ssh") === nothing + OpenSSH_jll.ssh() do ssh_exec + run(Cmd(`$ssh_exec -f $(ssh_string(s)) $julia_cmd`; detach = true)) + end + else + run(Cmd(`ssh -f $(ssh_string(s)) $julia_cmd`; detach = true)) end else - run(Cmd(`ssh -f $(ssh_string(s)) $julia_cmd`; detach = true)) + e = s.julia_exec * " --project=$(conf_path)" + julia_cmd = Cmd([string.(split(e))..., "--startup-file=no", "-t", "auto", "-e", + scrpt, "&>", p, "&"]) + run(Cmd(julia_cmd; detach = true); wait = false) end - else - e = s.julia_exec * " --project=$(conf_path)" - julia_cmd = Cmd([string.(split(e))..., "--startup-file=no", "-t", "auto", "-e", - scrpt, "&>", p, "&"]) - run(Cmd(julia_cmd; detach = true); wait = false) - end - - #TODO: little hack here - retries = 0 - prog = ProgressUnknown("Waiting for server bootup:"; spinner = true) - while checktime() <= firstime && retries < 60 - ProgressMeter.next!(prog; spinner = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏", showvalues = [(:try, retries)]) - retries += 1 - sleep(1) - end - finish!(prog) - if retries == 60 - error("Something went wrong starting the server.") - else + retries = 0 + push!(spinner, "Waiting for server bootup") + + while checktime() <= firstime && retries < 60 + retries += 1 + sleep(1) + end + + if retries == 60 + finish!(spinner, ErrorException("Something went wrong starting the server.")) + end + + next!(spinner) + cfg = load_config(s) s.port = cfg.port s.uuid = cfg.uuid - @debug "Daemon on Server $(s.name) started, listening on local port $(s.port)." - @debug "Saving updated server info..." save(s) - end - if islocal(s) - while !isalive(s) - sleep(0.1) + + retries = 0 + + if islocal(s) + while !isalive(s) && retries < 60 + sleep(0.1) + retries += 1 + end + LOCAL_SERVER[] = local_server() + + else + check_connections(; names=[s.name]) + while !isalive(s) && retries < 60 + sleep(0.1) + end end - LOCAL_SERVER[] = local_server() - else - check_connections(; names=[s.name]) - while !isalive(s) - sleep(0.1) + + if retries == 60 + finish!(spinner, ErrorException("""Couldn't set up server connection. + This can be because the daemon crashed + or because the local server can't setup a ssh tunnel to it""")) end + + return s end - return s end """ diff --git a/src/runtime.jl b/src/runtime.jl index a2b9fbc..77a9907 100644 --- a/src/runtime.jl +++ b/src/runtime.jl @@ -279,12 +279,13 @@ end function check_connections!(connections, verify_tunnels; names=keys(connections)) @debug "Checking connections..." logtype=RuntimeLog - for (n, connected) in connections + + for n in names if !exists(Server(name=n)) pop!(connections, n) continue end - !(n in names) && continue + s = load(Server(n)) s.domain == "localhost" && continue @@ -296,22 +297,20 @@ function check_connections!(connections, verify_tunnels; names=keys(connections) connections[n] = false end end + if verify_tunnels @debugv 1 "Verifying tunnels" logtype=RuntimeLog - for (n, connected) in connections - connected && continue - !(n in names) && continue + for n in names + + connections[n] && continue + s = load(Server(n)) s.domain == "localhost" && continue - - if find_tunnel(s) !== nothing - @debugv 0 "Couldn't connect to Server $n but tunnel exists so ignoring. Destroy it manually to try creating a new one and reconnect." - continue - end @debugv 0 "Connection to $n: $(connections[n])" logtype=RuntimeLog connections[n] = @timeout 30 begin + destroy_tunnel(s) try remote_server = load_config(s.username, s.domain, config_path(s)) remote_server === nothing && return false @@ -336,20 +335,26 @@ function check_connections!(connections, verify_tunnels; names=keys(connections) end false end end + return connections end function check_connections!(server_data::ServerData, args...; kwargs...) all_servers = load(Server("")) + for k in filter(x-> !(x in all_servers), keys(server_data.connections)) delete!(server_data.connections, k) end + for n in all_servers n == server_data.server.name && continue server_data.connections[n] = get(server_data.connections, n, false) end + conn = check_connections!(server_data.connections, args...; kwargs...) + @debugv 1 "Connections: $(server_data.connections)" logtype=RuntimeLog + return conn end @@ -376,10 +381,10 @@ function julia_main(;verbose=0, kwargs...)::Cint connection_task = Threads.@spawn @stoppable server_data.stop begin @debug "Checking Connections" logtype=RuntimeLog try - check_connections!(server_data, true) + check_connections!(server_data, false) while true t = time() - check_connections!(server_data, true) + check_connections!(server_data, false) sleep_t = 5 - (time() - t) if sleep_t > 0 sleep(sleep_t) diff --git a/src/servers.jl b/src/servers.jl index d4d11f1..58622e4 100644 --- a/src/servers.jl +++ b/src/servers.jl @@ -125,7 +125,7 @@ function configure!(s::Server; interactive = true) yn_id == -1 && return if yn_id == 1 s.julia_exec = install_julia(s) - install_RemoteHPC(s) + install(s) else @debug """ You will need to install julia, e.g. by using `RemoteHPC.install_julia` or manually on the cluster. @@ -173,7 +173,7 @@ function configure!(s::Server; interactive = true) conf_path = config_path(s) t = server_command(s, "ls $(conf_path)") if t.exitcode != 0 - install_RemoteHPC(s) + install(s) end s.uuid = string(uuid4()) return s @@ -220,99 +220,208 @@ end # TODO use versions.json from main julia site function install_julia(s::Server) + julia_tar = "julia-1.8.5-linux-x86_64.tar.gz" - p = ProgressUnknown("Installing julia on Server $(s.name) ($(s.username)@$(s.domain))...", spinner=true) - t = tempname() - mkdir(t) - next!(p, showvalues = [("step [1/3]", "downloading")]) - download("https://julialang-s3.julialang.org/bin/linux/x64/1.8/julia-1.8.5-linux-x86_64.tar.gz", + + title = "Installing julia on Server $(s.name) ($(s.username)@$(s.domain))..." + steps = ["downloading locally", + "pushing to remote", + "unpacking on remote"] + + StepSpinner(title, steps) do spinner + t = tempname() + mkdir(t) + download("https://julialang-s3.julialang.org/bin/linux/x64/1.8/julia-1.8.5-linux-x86_64.tar.gz", joinpath(t, "julia.tar.gz")) - next!(p, showvalues = [("step [2/3]", "pushing")]) - push(joinpath(t, "julia.tar.gz"), s, julia_tar) - rm(t; recursive = true) - next!(p, showvalues = [("step [3/3]", "unpacking")]) - res = server_command(s, "tar -xf $julia_tar") - server_command(s, "rm $julia_tar") - finish!(p) - @assert res.exitcode == 0 "Issue unpacking julia executable on cluster, please install julia manually" - @debug "julia installed on Server $(s.name) in ~/julia-1.8.5/bin" - return "~/julia-1.8.5/bin/julia" -end - -function install_RemoteHPC(s::Server, julia_exec = s.julia_exec) + + next!(spinner) + + push(joinpath(t, "julia.tar.gz"), s, julia_tar) + + rm(t; recursive = true) + + next!(spinner) + + res = server_command(s, "tar -xf $julia_tar") + if res.exitcode != 0 + finish!(spinner, ErrorException("Issue unpacking julia executable on cluster, please install julia manually")) + end + server_command(s, "rm $julia_tar") + + return "~/julia-1.8.5/bin/julia" + end +end + +function install(s::Server, julia_exec = s.julia_exec) # We install the latest version of julia in the homedir - res = server_command(s, "which $julia_exec") - if res.exitcode != 0 - julia_exec = install_julia(s) + + title = "Installing RemoteHPC on remote" + steps = ["installing julia", + "installing RemoteHPC"] + + StepSpinner(title, steps) do spinner + res = server_command(s, "which $julia_exec") + if res.exitcode != 0 + julia_exec = install_julia(s) + else + julia_exec = res.stdout[1:end-1] + end + next!(spinner) + + s.julia_exec = julia_exec + res = julia_cmd(s, "using Pkg; Pkg.activate(joinpath(Pkg.depots()[1], \"config/RemoteHPC\")); Pkg.add(\"RemoteHPC\");Pkg.build(\"RemoteHPC\")") + + if res.exitcode != 0 + finish!(spinner, ErrorException("Something went wrong installing RemoteHPC on server, please install manually")) + end + end + @info "RemoteHPC installed on remote cluster, try starting the server with `start(server)`." +end + +mutable struct StepSpinner + steps::Vector{String} + step_msgs::Vector{Vector{String}} + curstep::Int + dt::Float64 + spinner::String + finished::Bool + prog::ProgressUnknown + t::Union{Task, Nothing} + showvalues::Vector{NTuple{2, String}} +end + +function StepSpinner(title::String, steps::Vector{String}; dt=0.1, spinner = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏") + step_msgs = [String[] for i = 1:length(steps)] + + prog = ProgressUnknown(title, spinner=true, dt=0.0) + + out = StepSpinner(steps, step_msgs, 1, dt, spinner, false, prog, nothing, NTuple{2,String}[]) + + out.showvalues = showvalues(out) + return out +end + +Base.length(s::StepSpinner) = length(s.steps) + +function ProgressMeter.next!(s::StepSpinner) + s.curstep += 1 + s.showvalues = showvalues(s) +end + +function ProgressMeter.finish!(s::StepSpinner, msg=nothing) + s.finished = true + + if s.t !== nothing && !istaskdone(s.t) + fetch(s.t) + end + + if msg isa AbstractString + finish!(s.prog; showvalues = ("SUCCESS", msg)) + elseif msg === nothing + finish!(s.prog) else - julia_exec = res.stdout[1:end-1] + finish!(s.prog; spinner = '✗') + throw(msg) end - @info "Installing RemoteHPC" - s.julia_exec = julia_exec - res = julia_cmd(s, "using Pkg; Pkg.activate(joinpath(Pkg.depots()[1], \"config/RemoteHPC\")); Pkg.add(\"RemoteHPC\");Pkg.build(\"RemoteHPC\")") - @assert res.exitcode == 0 "Something went wrong installing RemoteHPC on server, please install manually" +end - @info "RemoteHPC installed on remote cluster, try starting the server with `start(server)`." - return +function showvalues(s::StepSpinner) + nsteps = length(s) + out = NTuple{2, String}[] + for i = 1:s.curstep + + t = [("Step [$i/$nsteps]", s.steps[i])] + + for (im, msg) in enumerate(s.step_msgs[i]) + push!(t, ("$im", msg)) + end + append!(out, t) + end + + return out +end + +function Base.push!(s::StepSpinner, msg::String) + curlen = length(s.step_msgs[s.curstep]) + push!(s.step_msgs[s.curstep], msg) + + push!(s.showvalues, ("$(curlen+1)", msg)) end -function update_RemoteHPC(s::Server) - p = ProgressUnknown("Updating RemoteHPC on Server $(s.name) ($(s.username)@$(s.domain))...", spinner=true, dt=0.0) - curvals = [("step [1/3]", "Checking server status")] - finished = false - ptsk = Threads.@spawn begin - while !finished - next!(p, showvalues = curvals, spinner="⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏") - sleep(0.1) +function StepSpinner(f::Function, args...; kwargs...) + s = StepSpinner(args...; kwargs...) + s.t = Threads.@spawn begin + while !s.finished + next!(s.prog, showvalues = s.showvalues, spinner=s.spinner) + sleep(s.dt) end end - v = nothing try - v = version(s) - curvals = [("step [1/3]", "Current version $v")] - catch - curvals = [("step [1/3]", "Current version could not be determined")] + return f(s) + finally + finish!(s) end +end + +function update(s::Server) + + title = "Updating RemoteHPC on Server $(s.name) ($(s.username)@$(s.domain))..." + + steps = ["Checking server status", + "Updating RemoteHPC", + "Restarting Server if needed"] + + StepSpinner(title, steps, spinner= "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏", dt=0.1) do spinner + v = nothing + try + v = version(s) + push!(spinner, "Current version $v") + catch + push!(spinner, "Current version could not be determined") + end - alive = isalive(s) - if alive - curvals = [("step [1/3]", "Server was alive, killing")] - kill(s) - end - curvals = [("step [2/3]", "Updating RemoteHPC")] - if islocal(s) - curproj = Pkg.project().path - Pkg.activate(joinpath(depot_path(s), "config/RemoteHPC")) - Pkg.update() - Pkg.activate(curproj) - else - curvals = [("step [2/3]", "Executing remote update command")] - res = julia_cmd(s, "using Pkg; Pkg.activate(joinpath(Pkg.depots()[1], \"config/RemoteHPC\")); Pkg.update(\"RemoteHPC\")") - if res.exitcode != 0 - finished = true - fetch(ptsk) - finish!(p, spinner='✗') - error("Error while updating Server $(s.name):\nstdout: $(res.stdout) stderr: $(res.stderr) exitcode: $(res.exitcode)") + alive = isalive(s) + if alive + push!(spinner, curvals = "Server was alive, killing") + kill(s) end - end - curvals = [("step [3/3]", "Restarting Server if needed")] - if alive - @debug "Restarting server." - start(s) - end - finished = true - fetch(ptsk) - finish!(p) - if v !== nothing - newver = version(s) - if v == newver - @warn "Version did not update, is RemoteHPC installed from a fixed path on the Server?" + + next!(spinner) + + if islocal(s) + curproj = Pkg.project().path + Pkg.activate(joinpath(depot_path(s), "config/RemoteHPC")) + push!(spinner, "Executing local update command") + Pkg.update() + Pkg.activate(curproj) else - @info "Version $v -> $newver" + push!(spinner, "Executing remote update command") + res = julia_cmd(s, "using Pkg; Pkg.activate(joinpath(Pkg.depots()[1], \"config/RemoteHPC\")); Pkg.update(\"RemoteHPC\")") + if res.exitcode != 0 + finish!(spinner, ErrorException("Error while updating Server $(s.name):\nstdout: $(res.stdout) stderr: $(res.stderr) exitcode: $(res.exitcode)")) + end end - else - @info "New version $(version(s))" - end + + next!(spinner) + + if alive + push!(spinner, "Restarting server...") + start(s) + end + + finish!(spinner) + + if v !== nothing + newver = version(s) + if v == newver + @warn "Version did not update, is RemoteHPC installed from a fixed path on the Server?" + else + @info "Version $v -> $newver" + end + else + @info "New version $(version(s))" + end + end end Base.joinpath(s::Server, p...) = joinpath(s.jobdir, p...) diff --git a/test/runtests.jl b/test/runtests.jl index 62e54db..8c5e9d2 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -159,3 +159,13 @@ end @test !ispath(s, tname) @test !ispath(s, tname2) end + +@testset "spinner" begin + title = "test" + steps = ["test1", "test2"] + RemoteHPC.StepSpinner(title, steps,dt=0.1) do s + RemoteHPC.push!(s, "blabla") + RemoteHPC.next!(s) + RemoteHPC.push!(s, "test") + end +end