diff --git a/compiler/compile-config/compile-config-2.02-learning_inference b/compiler/compile-config/compile-config-2.02-learning_inference index 7702c4968..74fdfda3e 100755 --- a/compiler/compile-config/compile-config-2.02-learning_inference +++ b/compiler/compile-config/compile-config-2.02-learning_inference @@ -58,6 +58,11 @@ if .deepdive_.execution.processes | has("process/grounding/combine_factorgraph") $DEEPDIVE_SAMPLER_ARGS \\ # + \(if $deepdive.sampler.partitions > 1 then " + # make sure no stale inference result is laying around unless inference is done right after learning above + rm -f \"$results_dir\"/inference_result.out.text + " else "" end) + # TODO maybe this database interaction for the weights after # learning each shard is unnecessary if we have the sampler checkpoint # them directly, and we could also keep max two copies @@ -191,10 +196,7 @@ if .deepdive_.execution.processes | has("process/grounding/combine_factorgraph") for pid in $DEEPDIVE_FACTORGRAPH_SHARDS; do cd \"$DEEPDIVE_RESULTS_DIR\"/$pid cat inference_result.out.text | - # restoring shard ID to the vids - awk -v SHARD_BASE=$(($pid << 48)) ' - {printf \"%s\\t%s\\t%s\\n\", SHARD_BASE + $1, $2, $3} - ' | + pid=$pid deepdive env restore_partitioned_vids | DEEPDIVE_LOAD_FORMAT=tsv \\ deepdive load \(deepdiveInferenceResultVariablesTable | @sh) /dev/stdin done diff --git a/database/db-driver/greenplum/db-unload b/database/db-driver/greenplum/db-unload index ef1f02f9c..ee7883563 100755 --- a/database/db-driver/greenplum/db-unload +++ b/database/db-driver/greenplum/db-unload @@ -6,15 +6,16 @@ # parallel connections, which MAY RESULT IN INCORRECT RESULT UNLESS QUERY IS # TOTALLY ORDERED BY COLUMNS WITH NO DUPLICATE VALUES! # -# GPFDIST_PORT_BASE (defaults to 10000) is the base port number for the gpfdist -# processes. +# GPFDIST_PORT_BASE is the base port number for the gpfdist processes. +# (defaults to a random port in range 10000-59900) # GPFDIST_MAX_LENGTH (defaults to 1MiB) may need to be increased for larger # records to be unloaded through gpfdist. # GPFDIST_LOG_PATH can be set to a path prefix for logging. ## set -eu -: ${GPFDIST_DISABLE:=} ${GPFDIST_PORT_BASE:=10000} ${GPFDIST_MAX_LENGTH:=$((2**20))} ${GPFDIST_LOG_PATH:=} +: ${GPFDIST_DISABLE:=} ${GPFDIST_MAX_LENGTH:=$((2**20))} ${GPFDIST_LOG_PATH:=} +: ${GPFDIST_PORT_BASE:=$(( 100*($RANDOM % 500 + 100) ))} # randomized default base ports [[ $# -gt 0 ]] || usage "$0" "Missing QUERY" query=$1; shift @@ -70,12 +71,14 @@ case $format in sinkDir=$(dirname "$sink") sinkName=$(basename "$sink") mkdir -p "$sinkDir" spawn_gpfdist() { + # skip over ports already taken + while nc -z "$sinkHost" "$port"; do let ++port; done gpfdist -p $port \ ${GPFDIST_MAX_LENGTH:+-m $GPFDIST_MAX_LENGTH} \ ${GPFDIST_LOG_PATH:+-l ${GPFDIST_LOG_PATH}.$i.log} &>/dev/null \ "$@" & echo $! >>"$tmpDir"/gpfdist-$i.pid - # TODO cope with gpfdist failure, e.g., port number collision + # TODO cope with gpfdist failure, e.g., when another process suddenly tried to listen to the picked port number } if ${DEEPDIVE_SHOW_PROGRESS:-true}; then spawn_gpfdist -d "$tmpDir" diff --git a/inference/restore_partitioned_vids b/inference/restore_partitioned_vids new file mode 100755 index 000000000..519a7f63f --- /dev/null +++ b/inference/restore_partitioned_vids @@ -0,0 +1,7 @@ +#!/usr/bin/perl -w +use strict; +my $SHARD_BASE = $ENV{pid} << 48; +while (<>) { + my ($vid, @rest) = split(" "); + print join("\t", ($SHARD_BASE | $vid), @rest), "\n"; +} diff --git a/runner/compute-driver/local/compute-execute b/runner/compute-driver/local/compute-execute index c72e0445f..c23f1b86f 100755 --- a/runner/compute-driver/local/compute-execute +++ b/runner/compute-driver/local/compute-execute @@ -131,7 +131,8 @@ all_finishes_ok() { } done } +# the order of below is important. if we don't check unload first and unload fails, we are stuck +[[ ${#pids_unload[@]} -eq 0 ]] || all_finishes_ok "deepdive-unload" "${pids_unload[@]}" [[ ${#pids_command[@]} -eq 0 ]] || all_finishes_ok "command=$(escape4sh "$command")" "${pids_command[@]}" [[ ${#pids_load[@]} -eq 0 ]] || all_finishes_ok "deepdive-load" "${pids_load[@]}" -[[ ${#pids_unload[@]} -eq 0 ]] || all_finishes_ok "deepdive-unload" "${pids_unload[@]}" wait # until everything is done ############################################## diff --git a/stage.sh b/stage.sh index 137dd16a4..be3accf22 100755 --- a/stage.sh +++ b/stage.sh @@ -142,6 +142,7 @@ generate-wrapper-for-libdirs "$STAGE_DIR"/util/sampler-$cmd \ done stage inference/deepdive-model util/ stage inference/run-sampler util/ +stage inference/restore_partitioned_vids util/ # Stanford CoreNLP utilities stage util/nlp/deepdive-corenlp util/