diff options
Diffstat (limited to 'tensorflow/tools/dist_test/scripts')
4 files changed, 155 insertions, 74 deletions
diff --git a/tensorflow/tools/dist_test/scripts/create_tf_cluster.sh b/tensorflow/tools/dist_test/scripts/create_tf_cluster.sh index b0e07588e8..69c459ec8c 100755 --- a/tensorflow/tools/dist_test/scripts/create_tf_cluster.sh +++ b/tensorflow/tools/dist_test/scripts/create_tf_cluster.sh @@ -167,10 +167,10 @@ fi "${KUBECTL_BIN}" create -f "${K8S_YAML}" # Wait for external IP of worker services to become available -get_tf_worker_external_ip() { - # Usage: gen_tf_worker_external_ip <WORKER_INDEX> - # E.g., gen_tf_worker_external_ip 2 - echo $("${KUBECTL_BIN}" get svc | grep "^tf-worker${1}" | \ +get_tf_external_ip() { + # Usage: gen_tf_worker_external_ip <JOB_NAME> <TASK_INDEX> + # E.g., gen_tf_worker_external_ip ps 2 + echo $("${KUBECTL_BIN}" get svc | grep "^tf-${1}${2}" | \ awk '{print $3}' | grep -E "[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+") } @@ -187,16 +187,16 @@ if [[ ${IS_LOCAL_CLUSTER} == "0" ]]; then "of tf-worker0 service to emerge" fi - EXTERN_IPS="" + WORKER_EXTERN_IPS="" WORKER_INDEX=0 - N_AVAILABLE_EXTERNAL_IPS=0 + N_AVAILABLE_WORKER_EXTERNAL_IPS=0 while true; do - SVC_EXTERN_IP=$(get_tf_worker_external_ip ${WORKER_INDEX}) + SVC_EXTERN_IP=$(get_tf_external_ip worker ${WORKER_INDEX}) if [[ ! -z "${SVC_EXTERN_IP}" ]]; then - EXTERN_IPS="${EXTERN_IPS} ${SVC_EXTERN_IP}" + WORKER_EXTERN_IPS="${WORKER_EXTERN_IPS} ${SVC_EXTERN_IP}" - ((N_AVAILABLE_EXTERNAL_IPS++)) + ((N_AVAILABLE_WORKER_EXTERNAL_IPS++)) fi ((WORKER_INDEX++)) @@ -205,16 +205,42 @@ if [[ ${IS_LOCAL_CLUSTER} == "0" ]]; then fi done - if [[ ${N_AVAILABLE_EXTERNAL_IPS} == ${NUM_WORKERS} ]]; then + PS_EXTERN_IPS="" + PS_INDEX=0 + N_AVAILABLE_PS_EXTERNAL_IPS=0 + while true; do + SVC_EXTERN_IP=$(get_tf_external_ip ps ${PS_INDEX}) + + if [[ ! -z "${SVC_EXTERN_IP}" ]]; then + PS_EXTERN_IPS="${PS_EXTERN_IPS} ${SVC_EXTERN_IP}" + + ((N_AVAILABLE_PS_EXTERNAL_IPS++)) + fi + + ((PS_INDEX++)) + if [[ ${PS_INDEX} == ${NUM_PARAMETER_SERVERS} ]]; then + break; + fi + done + + if [[ ${N_AVAILABLE_WORKER_EXTERNAL_IPS} == ${NUM_WORKERS} ]] && \ + [[ ${N_AVAILABLE_PS_EXTERNAL_IPS} == ${NUM_PARAMETER_SERVERS} ]]; then break; fi done GRPC_SERVER_URLS="" - for IP in ${EXTERN_IPS}; do + for IP in ${WORKER_EXTERN_IPS}; do GRPC_SERVER_URLS="${GRPC_SERVER_URLS} grpc://${IP}:${GRPC_PORT}" done - echo "GRPC URLs of tf-workers: ${GRPC_SERVER_URLS}" + + GRPC_PS_URLS="" + for IP in ${PS_EXTERN_IPS}; do + GRPC_PS_URLS="${GRPC_PS_URLS} grpc://${IP}:${GRPC_PORT}" + done + + echo "GRPC URLs of tf-worker instances: ${GRPC_SERVER_URLS}" + echo "GRPC URLs of tf-ps instances: ${GRPC_PS_URLS}" else echo "Waiting for tf pods to be all running..." @@ -251,3 +277,4 @@ fi echo "Cluster setup complete." +echo "" diff --git a/tensorflow/tools/dist_test/scripts/dist_mnist_test.sh b/tensorflow/tools/dist_test/scripts/dist_mnist_test.sh index d95f524486..4f2cab22d9 100755 --- a/tensorflow/tools/dist_test/scripts/dist_mnist_test.sh +++ b/tensorflow/tools/dist_test/scripts/dist_mnist_test.sh @@ -19,24 +19,28 @@ # grpc pods and service set up. # # Usage: -# dist_mnist_test.sh [--ps-hosts <PS_HOSTS>] -# [--worker-hosts <WORKER_HOSTS>] -# [--num-gpus <NUM_GPUS>] -# [--sync-replicas] +# dist_mnist_test.sh [--existing_servers (True|False)] +# [--ps_hosts <PS_HOSTS>] +# [--worker_hosts <WORKER_HOSTS>] +# [--num_gpus <NUM_GPUS>] +# [--sync_replicas] # -# --sync-replicas +# --existing_servers +# Use TensorFlow GRPC servers that are already created and running. +# +# --sync_replicas # Use the synchronized-replica mode. The parameter updates from the replicas # (workers) will be aggregated before applied, which avoids stale parameter # updates. # -# ps-hosts/worker-hosts is the list of IP addresses or the GRPC URLs of the ps/worker of +# ps_hosts/worker_hosts is the list of IP addresses or the GRPC URLs of the ps/worker of # the worker sessions, separated with "," # e.g., "localhost:2222,localhost:2223" # -# --num-gpus <NUM_GPUS>: +# --num_gpus <NUM_GPUS>: # Specifies the number of gpus to use # -# NOTES: +# NOTES: # If you have the error "$'\r': command not found" # Please run the command below to remove trailing '\r' character that causes the error: # sed -i 's/\r$//' dist_mnist_test.sh @@ -52,25 +56,33 @@ die() { } if [[ $# == "0" ]]; then - die "Usage: $0 [--ps-hosts <PS_HOSTS>] [--worker-hosts <WORKER_HOSTS>] "\ -"[--num-gpus <NUM_GPUS>] [--sync-replicas]" + die "Usage: $0 [--ps_hosts <PS_HOSTS>] [--worker_hosts <WORKER_HOSTS>] "\ +"[--num_gpus <NUM_GPUS>] [--sync_replicas]" fi # Process additional input arguments SYNC_REPLICAS=0 +N_GPUS=0 +EXISTING_SERVERS=False while true; do - if [[ "$1" == "--ps-hosts" ]]; then + if [[ "$1" == "--ps_hosts" ]]; then PS_HOSTS=$2 - elif [[ "$1" == "--worker-hosts" ]]; then + elif [[ "$1" == "--worker_hosts" ]]; then WORKER_HOSTS=$2 - elif [[ "$1" == "--num-gpus" ]]; then + elif [[ "$1" == "--existing_servers" ]]; then + EXISTING_SERVERS=$2 + if [[ "${EXISTING_SERVERS}" != "True" ]] && \ + [[ "${EXISTING_SERVERS}" != "False" ]]; then + die "Invalid value for --existing_servers: should be (True|False)" + fi + elif [[ "$1" == "--num_gpus" ]]; then N_GPUS=$2 - elif [[ "$1" == "--sync-replicas" ]]; then + elif [[ "$1" == "--sync_replicas" ]]; then SYNC_REPLICAS="1" - die "ERROR: --sync-replicas (synchronized-replicas) mode is not fully "\ + die "ERROR: --sync_replicas (synchronized-replicas) mode is not fully "\ "supported by this test yet." - # TODO(cais): Remove error message once sync-replicas is fully supported + # TODO(cais): Remove error message once sync_replicas is fully supported. fi shift 2 @@ -86,6 +98,7 @@ else SYNC_REPLICAS_FLAG="False" fi +echo "EXISTING_SERVERS = ${EXISTING_SERVERS}" echo "PS_HOSTS = ${PS_HOSTS}" echo "WORKER_HOSTS = ${WORKER_HOSTS}" echo "NUM_GPUS = ${N_GPUS}" @@ -105,6 +118,7 @@ PS_LOG_PREFIX="/tmp/ps" # First, download the data from a single process, to avoid race-condition # during data downloading +# Pre-download data files. timeout ${TIMEOUT} python "${MNIST_REPLICA}" \ --ps_hosts="${PS_HOSTS}" \ --worker_hosts="${WORKER_HOSTS}" \ @@ -123,25 +137,30 @@ PS_ARRAY=$(echo ${PS_HOSTS} | awk -F "," '{for(i=1;i<=NF;i++){printf $i" "}}') # Run a number of ps in parallel. In general, we only set 1 ps. echo "${N_PS} ps process(es) running in parallel..." -IDX=0 -PS=($PS_HOSTS) -while true; do - timeout ${TIMEOUT} python "${MNIST_REPLICA}" \ - --ps_hosts="${PS_HOSTS}" \ - --worker_hosts="${WORKER_HOSTS}" \ - --job_name="ps" \ - --task_index=${IDX} \ - --num_gpus=${N_GPUS} \ - --sync_replicas=${SYNC_REPLICAS_FLAG} \ | tee "${PS_LOG_PREFIX}${IDX}.log" & - echo "PS ${IDX}: " - echo " PS HOST: ${PS_ARRAY[IDX]}" - echo " log file: ${PS_LOG_PREFIX}${IDX}.log" - - ((IDX++)) - if [[ "${IDX}" == "${N_PS}" ]]; then - break - fi -done +if [[ ${EXISTING_SERVERS} == "False" ]]; then + echo "Hello" + # Create parameter servers. + IDX=0 + PS=($PS_HOSTS) + while true; do + python "${MNIST_REPLICA}" \ + --existing_servers="${EXISTING_SERVERS}" \ + --ps_hosts="${PS_HOSTS}" \ + --worker_hosts="${WORKER_HOSTS}" \ + --job_name="ps" \ + --task_index=${IDX} \ + --num_gpus=${N_GPUS} \ + --sync_replicas=${SYNC_REPLICAS_FLAG} | tee "${PS_LOG_PREFIX}${IDX}.log" & + echo "PS ${IDX}: " + echo " PS HOST: ${PS_ARRAY[IDX]}" + echo " log file: ${PS_LOG_PREFIX}${IDX}.log" + + ((IDX++)) + if [[ "${IDX}" == "${N_PS}" ]]; then + break + fi + done +fi # Get N_WORKERS by WORKER_HOSTS @@ -155,12 +174,14 @@ INDICES="" IDX=0 while true; do timeout ${TIMEOUT} python "${MNIST_REPLICA}" \ + --existing_servers="${EXISTING_SERVERS}" \ --ps_hosts="${PS_HOSTS}" \ --worker_hosts="${WORKER_HOSTS}" \ --job_name="worker" \ --task_index=${IDX} \ --num_gpus=${N_GPUS} \ - --sync_replicas=${SYNC_REPLICAS_FLAG} \ | tee "${WKR_LOG_PREFIX}${IDX}.log" & + --train_steps=500 \ + --sync_replicas=${SYNC_REPLICAS_FLAG} | tee "${WKR_LOG_PREFIX}${IDX}.log" & echo "Worker ${IDX}: " echo " WORKER HOST: ${WORKER_ARRAY[IDX]}" echo " log file: ${WKR_LOG_PREFIX}${IDX}.log" @@ -171,9 +192,8 @@ while true; do if [[ "${IDX}" == "${N_WORKERS}" ]]; then break fi -done - +done # Poll until all final validation cross entropy values become available or diff --git a/tensorflow/tools/dist_test/scripts/dist_test.sh b/tensorflow/tools/dist_test/scripts/dist_test.sh index 1d60aa518f..080ce1df5f 100755 --- a/tensorflow/tools/dist_test/scripts/dist_test.sh +++ b/tensorflow/tools/dist_test/scripts/dist_test.sh @@ -25,25 +25,25 @@ # TensorFlow ops. # # Usage: -# dist_test.sh [--setup-cluster-only] -# [--model-name (MNIST | CENSUS_WIDENDEEP)] -# [--num-workers <NUM_WORKERS>] -# [--num-parameter-servers <NUM_PARAMETER_SERVERS>] -# [--sync-replicas] +# dist_test.sh [--setup_cluster_only] +# [--model_name (MNIST | CENSUS_WIDENDEEP)] +# [--num_workers <NUM_WORKERS>] +# [--num_parameter_servers <NUM_PARAMETER_SERVERS>] +# [--sync_replicas] # -# --setup-cluster-only: +# --setup_cluster_only: # Lets the script only set up the k8s container network # -# --model-name +# --model_name # Name of the model to test. Default is MNIST. # # --num-workers <NUM_WORKERS>: # Specifies the number of worker pods to start # -# --num-parameter-server <NUM_PARAMETER_SERVERS>: +# --num_parameter_servers <NUM_PARAMETER_SERVERS>: # Specifies the number of parameter servers to start # -# --sync-replicas +# --sync_replicas # Use the synchronized-replica mode. The parameter updates from the replicas # (workers) will be aggregated before applied, which avoids stale parameter # updates. @@ -72,15 +72,15 @@ SYNC_REPLICAS=0 SETUP_CLUSTER_ONLY=0 while true; do - if [[ "$1" == "--model-name" ]]; then + if [[ "$1" == "--model_name" ]]; then MODEL_NAME=$2 - elif [[ "$1" == "--num-workers" ]]; then + elif [[ "$1" == "--num_workers" ]]; then NUM_WORKERS=$2 - elif [[ "$1" == "--num-parameter-servers" ]]; then + elif [[ "$1" == "--num_parameter_servers" ]]; then NUM_PARAMETER_SERVERS=$2 - elif [[ "$1" == "--sync-replicas" ]]; then + elif [[ "$1" == "--sync_replicas" ]]; then SYNC_REPLICAS=1 - elif [[ "$1" == "--setup-cluster-only" ]]; then + elif [[ "$1" == "--setup_cluster_only" ]]; then SETUP_CLUSTER_ONLY=1 fi shift @@ -132,17 +132,32 @@ else tee "${TMP}" || \ die "Creation of TensorFlow k8s cluster FAILED" - GRPC_SERVER_URLS=$(cat ${TMP} | grep "GRPC URLs of tf-workers: .*" | \ - sed -e 's/GRPC URLs of tf-workers://g') + GRPC_SERVER_URLS=$(cat ${TMP} | grep "GRPC URLs of tf-worker instances: .*" | \ + sed -e 's/GRPC URLs of tf-worker instances://g') + + GRPC_PS_URLS=$(cat ${TMP} | grep "GRPC URLs of tf-ps instances: .*" | \ + sed -e 's/GRPC URLs of tf-ps instances://g') if [[ $(echo ${GRPC_SERVER_URLS} | wc -w) != ${NUM_WORKERS} ]]; then die "FAILED to determine GRPC server URLs of all workers" fi + if [[ $(echo ${GRPC_PS_URLS} | wc -w) != ${NUM_PARAMETER_SERVERS} ]]; then + die "FAILED to determine GRPC server URLs of all parameter servers" + fi + + WORKER_HOSTS=$(echo "${GRPC_SERVER_URLS}" | sed -e 's/^[[:space:]]*//' | \ + sed -e 's/grpc:\/\///g' | sed -e 's/ /,/g') + PS_HOSTS=$(echo "${GRPC_PS_URLS}" | sed -e 's/^[[:space:]]*//' | \ + sed -e 's/grpc:\/\///g' | sed -e 's/ /,/g') + + echo "WORKER_HOSTS = ${WORKER_HOSTS}" + echo "PS_HOSTS = ${PS_HOSTS}" + rm -f ${TMP} if [[ ${SETUP_CLUSTER_ONLY} == "1" ]]; then echo "Skipping testing of distributed runtime due to "\ -"option flag --setup-cluster-only" +"option flag --setup_cluster_only" exit 0 fi fi @@ -158,17 +173,21 @@ test_MNIST() { return 1 fi - echo "Performing distributed MNIST training through grpc sessions @ "\ + echo "Performing distributed MNIST training through worker grpc sessions @ "\ "${GRPC_SERVER_URLS}..." + echo "and ps grpc sessions @ ${GRPC_PS_URLS}" + SYNC_REPLICAS_FLAG="" if [[ ${SYNC_REPLICAS} == "1" ]]; then - SYNC_REPLICAS_FLAG="--sync-replicas" + SYNC_REPLICAS_FLAG="--sync_replicas" fi - "${MNIST_DIST_TEST_BIN}" "${GRPC_SERVER_URLS}" \ - --num-workers "${NUM_WORKERS}" \ - --num-parameter-servers "${NUM_PARAMETER_SERVERS}" \ + "${MNIST_DIST_TEST_BIN}" \ + --existing_servers True \ + --ps_hosts "${PS_HOSTS}" \ + --worker_hosts "${WORKER_HOSTS}" \ + --num_gpus 0 \ ${SYNC_REPLICAS_FLAG} if [[ $? == "0" ]]; then diff --git a/tensorflow/tools/dist_test/scripts/k8s_tensorflow.py b/tensorflow/tools/dist_test/scripts/k8s_tensorflow.py index 3a427a1d4e..854c6b832a 100755 --- a/tensorflow/tools/dist_test/scripts/k8s_tensorflow.py +++ b/tensorflow/tools/dist_test/scripts/k8s_tensorflow.py @@ -136,6 +136,19 @@ spec: selector: tf-ps: "{param_server_id}" """) +PARAM_LB_SVC = ("""apiVersion: v1 +kind: Service +metadata: + name: tf-ps{param_server_id} + labels: + tf-ps: "{param_server_id}" +spec: + type: LoadBalancer + ports: + - port: {port} + selector: + tf-ps: "{param_server_id}" +""") def main(): @@ -218,8 +231,10 @@ def GenerateConfig(num_workers, num_param_servers, port)) config += '---\n' - config += PARAM_SERVER_SVC.format(port=port, - param_server_id=param_server) + if request_load_balancer: + config += PARAM_LB_SVC.format(port=port, param_server_id=param_server) + else: + config += PARAM_SERVER_SVC.format(port=port, param_server_id=param_server) config += '---\n' return config |