aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/tools/dist_test/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/tools/dist_test/scripts')
-rwxr-xr-xtensorflow/tools/dist_test/scripts/create_tf_cluster.sh51
-rwxr-xr-xtensorflow/tools/dist_test/scripts/dist_mnist_test.sh96
-rwxr-xr-xtensorflow/tools/dist_test/scripts/dist_test.sh63
-rwxr-xr-xtensorflow/tools/dist_test/scripts/k8s_tensorflow.py19
4 files changed, 155 insertions, 74 deletions
diff --git a/tensorflow/tools/dist_test/scripts/create_tf_cluster.sh b/tensorflow/tools/dist_test/scripts/create_tf_cluster.sh
index b0e07588e8..69c459ec8c 100755
--- a/tensorflow/tools/dist_test/scripts/create_tf_cluster.sh
+++ b/tensorflow/tools/dist_test/scripts/create_tf_cluster.sh
@@ -167,10 +167,10 @@ fi
"${KUBECTL_BIN}" create -f "${K8S_YAML}"
# Wait for external IP of worker services to become available
-get_tf_worker_external_ip() {
- # Usage: gen_tf_worker_external_ip <WORKER_INDEX>
- # E.g., gen_tf_worker_external_ip 2
- echo $("${KUBECTL_BIN}" get svc | grep "^tf-worker${1}" | \
+get_tf_external_ip() {
+ # Usage: gen_tf_worker_external_ip <JOB_NAME> <TASK_INDEX>
+ # E.g., gen_tf_worker_external_ip ps 2
+ echo $("${KUBECTL_BIN}" get svc | grep "^tf-${1}${2}" | \
awk '{print $3}' | grep -E "[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+")
}
@@ -187,16 +187,16 @@ if [[ ${IS_LOCAL_CLUSTER} == "0" ]]; then
"of tf-worker0 service to emerge"
fi
- EXTERN_IPS=""
+ WORKER_EXTERN_IPS=""
WORKER_INDEX=0
- N_AVAILABLE_EXTERNAL_IPS=0
+ N_AVAILABLE_WORKER_EXTERNAL_IPS=0
while true; do
- SVC_EXTERN_IP=$(get_tf_worker_external_ip ${WORKER_INDEX})
+ SVC_EXTERN_IP=$(get_tf_external_ip worker ${WORKER_INDEX})
if [[ ! -z "${SVC_EXTERN_IP}" ]]; then
- EXTERN_IPS="${EXTERN_IPS} ${SVC_EXTERN_IP}"
+ WORKER_EXTERN_IPS="${WORKER_EXTERN_IPS} ${SVC_EXTERN_IP}"
- ((N_AVAILABLE_EXTERNAL_IPS++))
+ ((N_AVAILABLE_WORKER_EXTERNAL_IPS++))
fi
((WORKER_INDEX++))
@@ -205,16 +205,42 @@ if [[ ${IS_LOCAL_CLUSTER} == "0" ]]; then
fi
done
- if [[ ${N_AVAILABLE_EXTERNAL_IPS} == ${NUM_WORKERS} ]]; then
+ PS_EXTERN_IPS=""
+ PS_INDEX=0
+ N_AVAILABLE_PS_EXTERNAL_IPS=0
+ while true; do
+ SVC_EXTERN_IP=$(get_tf_external_ip ps ${PS_INDEX})
+
+ if [[ ! -z "${SVC_EXTERN_IP}" ]]; then
+ PS_EXTERN_IPS="${PS_EXTERN_IPS} ${SVC_EXTERN_IP}"
+
+ ((N_AVAILABLE_PS_EXTERNAL_IPS++))
+ fi
+
+ ((PS_INDEX++))
+ if [[ ${PS_INDEX} == ${NUM_PARAMETER_SERVERS} ]]; then
+ break;
+ fi
+ done
+
+ if [[ ${N_AVAILABLE_WORKER_EXTERNAL_IPS} == ${NUM_WORKERS} ]] && \
+ [[ ${N_AVAILABLE_PS_EXTERNAL_IPS} == ${NUM_PARAMETER_SERVERS} ]]; then
break;
fi
done
GRPC_SERVER_URLS=""
- for IP in ${EXTERN_IPS}; do
+ for IP in ${WORKER_EXTERN_IPS}; do
GRPC_SERVER_URLS="${GRPC_SERVER_URLS} grpc://${IP}:${GRPC_PORT}"
done
- echo "GRPC URLs of tf-workers: ${GRPC_SERVER_URLS}"
+
+ GRPC_PS_URLS=""
+ for IP in ${PS_EXTERN_IPS}; do
+ GRPC_PS_URLS="${GRPC_PS_URLS} grpc://${IP}:${GRPC_PORT}"
+ done
+
+ echo "GRPC URLs of tf-worker instances: ${GRPC_SERVER_URLS}"
+ echo "GRPC URLs of tf-ps instances: ${GRPC_PS_URLS}"
else
echo "Waiting for tf pods to be all running..."
@@ -251,3 +277,4 @@ fi
echo "Cluster setup complete."
+echo ""
diff --git a/tensorflow/tools/dist_test/scripts/dist_mnist_test.sh b/tensorflow/tools/dist_test/scripts/dist_mnist_test.sh
index d95f524486..4f2cab22d9 100755
--- a/tensorflow/tools/dist_test/scripts/dist_mnist_test.sh
+++ b/tensorflow/tools/dist_test/scripts/dist_mnist_test.sh
@@ -19,24 +19,28 @@
# grpc pods and service set up.
#
# Usage:
-# dist_mnist_test.sh [--ps-hosts <PS_HOSTS>]
-# [--worker-hosts <WORKER_HOSTS>]
-# [--num-gpus <NUM_GPUS>]
-# [--sync-replicas]
+# dist_mnist_test.sh [--existing_servers (True|False)]
+# [--ps_hosts <PS_HOSTS>]
+# [--worker_hosts <WORKER_HOSTS>]
+# [--num_gpus <NUM_GPUS>]
+# [--sync_replicas]
#
-# --sync-replicas
+# --existing_servers
+# Use TensorFlow GRPC servers that are already created and running.
+#
+# --sync_replicas
# Use the synchronized-replica mode. The parameter updates from the replicas
# (workers) will be aggregated before applied, which avoids stale parameter
# updates.
#
-# ps-hosts/worker-hosts is the list of IP addresses or the GRPC URLs of the ps/worker of
+# ps_hosts/worker_hosts is the list of IP addresses or the GRPC URLs of the ps/worker of
# the worker sessions, separated with ","
# e.g., "localhost:2222,localhost:2223"
#
-# --num-gpus <NUM_GPUS>:
+# --num_gpus <NUM_GPUS>:
# Specifies the number of gpus to use
#
-# NOTES:
+# NOTES:
# If you have the error "$'\r': command not found"
# Please run the command below to remove trailing '\r' character that causes the error:
# sed -i 's/\r$//' dist_mnist_test.sh
@@ -52,25 +56,33 @@ die() {
}
if [[ $# == "0" ]]; then
- die "Usage: $0 [--ps-hosts <PS_HOSTS>] [--worker-hosts <WORKER_HOSTS>] "\
-"[--num-gpus <NUM_GPUS>] [--sync-replicas]"
+ die "Usage: $0 [--ps_hosts <PS_HOSTS>] [--worker_hosts <WORKER_HOSTS>] "\
+"[--num_gpus <NUM_GPUS>] [--sync_replicas]"
fi
# Process additional input arguments
SYNC_REPLICAS=0
+N_GPUS=0
+EXISTING_SERVERS=False
while true; do
- if [[ "$1" == "--ps-hosts" ]]; then
+ if [[ "$1" == "--ps_hosts" ]]; then
PS_HOSTS=$2
- elif [[ "$1" == "--worker-hosts" ]]; then
+ elif [[ "$1" == "--worker_hosts" ]]; then
WORKER_HOSTS=$2
- elif [[ "$1" == "--num-gpus" ]]; then
+ elif [[ "$1" == "--existing_servers" ]]; then
+ EXISTING_SERVERS=$2
+ if [[ "${EXISTING_SERVERS}" != "True" ]] && \
+ [[ "${EXISTING_SERVERS}" != "False" ]]; then
+ die "Invalid value for --existing_servers: should be (True|False)"
+ fi
+ elif [[ "$1" == "--num_gpus" ]]; then
N_GPUS=$2
- elif [[ "$1" == "--sync-replicas" ]]; then
+ elif [[ "$1" == "--sync_replicas" ]]; then
SYNC_REPLICAS="1"
- die "ERROR: --sync-replicas (synchronized-replicas) mode is not fully "\
+ die "ERROR: --sync_replicas (synchronized-replicas) mode is not fully "\
"supported by this test yet."
- # TODO(cais): Remove error message once sync-replicas is fully supported
+ # TODO(cais): Remove error message once sync_replicas is fully supported.
fi
shift 2
@@ -86,6 +98,7 @@ else
SYNC_REPLICAS_FLAG="False"
fi
+echo "EXISTING_SERVERS = ${EXISTING_SERVERS}"
echo "PS_HOSTS = ${PS_HOSTS}"
echo "WORKER_HOSTS = ${WORKER_HOSTS}"
echo "NUM_GPUS = ${N_GPUS}"
@@ -105,6 +118,7 @@ PS_LOG_PREFIX="/tmp/ps"
# First, download the data from a single process, to avoid race-condition
# during data downloading
+# Pre-download data files.
timeout ${TIMEOUT} python "${MNIST_REPLICA}" \
--ps_hosts="${PS_HOSTS}" \
--worker_hosts="${WORKER_HOSTS}" \
@@ -123,25 +137,30 @@ PS_ARRAY=$(echo ${PS_HOSTS} | awk -F "," '{for(i=1;i<=NF;i++){printf $i" "}}')
# Run a number of ps in parallel. In general, we only set 1 ps.
echo "${N_PS} ps process(es) running in parallel..."
-IDX=0
-PS=($PS_HOSTS)
-while true; do
- timeout ${TIMEOUT} python "${MNIST_REPLICA}" \
- --ps_hosts="${PS_HOSTS}" \
- --worker_hosts="${WORKER_HOSTS}" \
- --job_name="ps" \
- --task_index=${IDX} \
- --num_gpus=${N_GPUS} \
- --sync_replicas=${SYNC_REPLICAS_FLAG} \ | tee "${PS_LOG_PREFIX}${IDX}.log" &
- echo "PS ${IDX}: "
- echo " PS HOST: ${PS_ARRAY[IDX]}"
- echo " log file: ${PS_LOG_PREFIX}${IDX}.log"
-
- ((IDX++))
- if [[ "${IDX}" == "${N_PS}" ]]; then
- break
- fi
-done
+if [[ ${EXISTING_SERVERS} == "False" ]]; then
+ echo "Hello"
+ # Create parameter servers.
+ IDX=0
+ PS=($PS_HOSTS)
+ while true; do
+ python "${MNIST_REPLICA}" \
+ --existing_servers="${EXISTING_SERVERS}" \
+ --ps_hosts="${PS_HOSTS}" \
+ --worker_hosts="${WORKER_HOSTS}" \
+ --job_name="ps" \
+ --task_index=${IDX} \
+ --num_gpus=${N_GPUS} \
+ --sync_replicas=${SYNC_REPLICAS_FLAG} | tee "${PS_LOG_PREFIX}${IDX}.log" &
+ echo "PS ${IDX}: "
+ echo " PS HOST: ${PS_ARRAY[IDX]}"
+ echo " log file: ${PS_LOG_PREFIX}${IDX}.log"
+
+ ((IDX++))
+ if [[ "${IDX}" == "${N_PS}" ]]; then
+ break
+ fi
+ done
+fi
# Get N_WORKERS by WORKER_HOSTS
@@ -155,12 +174,14 @@ INDICES=""
IDX=0
while true; do
timeout ${TIMEOUT} python "${MNIST_REPLICA}" \
+ --existing_servers="${EXISTING_SERVERS}" \
--ps_hosts="${PS_HOSTS}" \
--worker_hosts="${WORKER_HOSTS}" \
--job_name="worker" \
--task_index=${IDX} \
--num_gpus=${N_GPUS} \
- --sync_replicas=${SYNC_REPLICAS_FLAG} \ | tee "${WKR_LOG_PREFIX}${IDX}.log" &
+ --train_steps=500 \
+ --sync_replicas=${SYNC_REPLICAS_FLAG} | tee "${WKR_LOG_PREFIX}${IDX}.log" &
echo "Worker ${IDX}: "
echo " WORKER HOST: ${WORKER_ARRAY[IDX]}"
echo " log file: ${WKR_LOG_PREFIX}${IDX}.log"
@@ -171,9 +192,8 @@ while true; do
if [[ "${IDX}" == "${N_WORKERS}" ]]; then
break
fi
-done
-
+done
# Poll until all final validation cross entropy values become available or
diff --git a/tensorflow/tools/dist_test/scripts/dist_test.sh b/tensorflow/tools/dist_test/scripts/dist_test.sh
index 1d60aa518f..080ce1df5f 100755
--- a/tensorflow/tools/dist_test/scripts/dist_test.sh
+++ b/tensorflow/tools/dist_test/scripts/dist_test.sh
@@ -25,25 +25,25 @@
# TensorFlow ops.
#
# Usage:
-# dist_test.sh [--setup-cluster-only]
-# [--model-name (MNIST | CENSUS_WIDENDEEP)]
-# [--num-workers <NUM_WORKERS>]
-# [--num-parameter-servers <NUM_PARAMETER_SERVERS>]
-# [--sync-replicas]
+# dist_test.sh [--setup_cluster_only]
+# [--model_name (MNIST | CENSUS_WIDENDEEP)]
+# [--num_workers <NUM_WORKERS>]
+# [--num_parameter_servers <NUM_PARAMETER_SERVERS>]
+# [--sync_replicas]
#
-# --setup-cluster-only:
+# --setup_cluster_only:
# Lets the script only set up the k8s container network
#
-# --model-name
+# --model_name
# Name of the model to test. Default is MNIST.
#
# --num-workers <NUM_WORKERS>:
# Specifies the number of worker pods to start
#
-# --num-parameter-server <NUM_PARAMETER_SERVERS>:
+# --num_parameter_servers <NUM_PARAMETER_SERVERS>:
# Specifies the number of parameter servers to start
#
-# --sync-replicas
+# --sync_replicas
# Use the synchronized-replica mode. The parameter updates from the replicas
# (workers) will be aggregated before applied, which avoids stale parameter
# updates.
@@ -72,15 +72,15 @@ SYNC_REPLICAS=0
SETUP_CLUSTER_ONLY=0
while true; do
- if [[ "$1" == "--model-name" ]]; then
+ if [[ "$1" == "--model_name" ]]; then
MODEL_NAME=$2
- elif [[ "$1" == "--num-workers" ]]; then
+ elif [[ "$1" == "--num_workers" ]]; then
NUM_WORKERS=$2
- elif [[ "$1" == "--num-parameter-servers" ]]; then
+ elif [[ "$1" == "--num_parameter_servers" ]]; then
NUM_PARAMETER_SERVERS=$2
- elif [[ "$1" == "--sync-replicas" ]]; then
+ elif [[ "$1" == "--sync_replicas" ]]; then
SYNC_REPLICAS=1
- elif [[ "$1" == "--setup-cluster-only" ]]; then
+ elif [[ "$1" == "--setup_cluster_only" ]]; then
SETUP_CLUSTER_ONLY=1
fi
shift
@@ -132,17 +132,32 @@ else
tee "${TMP}" || \
die "Creation of TensorFlow k8s cluster FAILED"
- GRPC_SERVER_URLS=$(cat ${TMP} | grep "GRPC URLs of tf-workers: .*" | \
- sed -e 's/GRPC URLs of tf-workers://g')
+ GRPC_SERVER_URLS=$(cat ${TMP} | grep "GRPC URLs of tf-worker instances: .*" | \
+ sed -e 's/GRPC URLs of tf-worker instances://g')
+
+ GRPC_PS_URLS=$(cat ${TMP} | grep "GRPC URLs of tf-ps instances: .*" | \
+ sed -e 's/GRPC URLs of tf-ps instances://g')
if [[ $(echo ${GRPC_SERVER_URLS} | wc -w) != ${NUM_WORKERS} ]]; then
die "FAILED to determine GRPC server URLs of all workers"
fi
+ if [[ $(echo ${GRPC_PS_URLS} | wc -w) != ${NUM_PARAMETER_SERVERS} ]]; then
+ die "FAILED to determine GRPC server URLs of all parameter servers"
+ fi
+
+ WORKER_HOSTS=$(echo "${GRPC_SERVER_URLS}" | sed -e 's/^[[:space:]]*//' | \
+ sed -e 's/grpc:\/\///g' | sed -e 's/ /,/g')
+ PS_HOSTS=$(echo "${GRPC_PS_URLS}" | sed -e 's/^[[:space:]]*//' | \
+ sed -e 's/grpc:\/\///g' | sed -e 's/ /,/g')
+
+ echo "WORKER_HOSTS = ${WORKER_HOSTS}"
+ echo "PS_HOSTS = ${PS_HOSTS}"
+
rm -f ${TMP}
if [[ ${SETUP_CLUSTER_ONLY} == "1" ]]; then
echo "Skipping testing of distributed runtime due to "\
-"option flag --setup-cluster-only"
+"option flag --setup_cluster_only"
exit 0
fi
fi
@@ -158,17 +173,21 @@ test_MNIST() {
return 1
fi
- echo "Performing distributed MNIST training through grpc sessions @ "\
+ echo "Performing distributed MNIST training through worker grpc sessions @ "\
"${GRPC_SERVER_URLS}..."
+ echo "and ps grpc sessions @ ${GRPC_PS_URLS}"
+
SYNC_REPLICAS_FLAG=""
if [[ ${SYNC_REPLICAS} == "1" ]]; then
- SYNC_REPLICAS_FLAG="--sync-replicas"
+ SYNC_REPLICAS_FLAG="--sync_replicas"
fi
- "${MNIST_DIST_TEST_BIN}" "${GRPC_SERVER_URLS}" \
- --num-workers "${NUM_WORKERS}" \
- --num-parameter-servers "${NUM_PARAMETER_SERVERS}" \
+ "${MNIST_DIST_TEST_BIN}" \
+ --existing_servers True \
+ --ps_hosts "${PS_HOSTS}" \
+ --worker_hosts "${WORKER_HOSTS}" \
+ --num_gpus 0 \
${SYNC_REPLICAS_FLAG}
if [[ $? == "0" ]]; then
diff --git a/tensorflow/tools/dist_test/scripts/k8s_tensorflow.py b/tensorflow/tools/dist_test/scripts/k8s_tensorflow.py
index 3a427a1d4e..854c6b832a 100755
--- a/tensorflow/tools/dist_test/scripts/k8s_tensorflow.py
+++ b/tensorflow/tools/dist_test/scripts/k8s_tensorflow.py
@@ -136,6 +136,19 @@ spec:
selector:
tf-ps: "{param_server_id}"
""")
+PARAM_LB_SVC = ("""apiVersion: v1
+kind: Service
+metadata:
+ name: tf-ps{param_server_id}
+ labels:
+ tf-ps: "{param_server_id}"
+spec:
+ type: LoadBalancer
+ ports:
+ - port: {port}
+ selector:
+ tf-ps: "{param_server_id}"
+""")
def main():
@@ -218,8 +231,10 @@ def GenerateConfig(num_workers,
num_param_servers,
port))
config += '---\n'
- config += PARAM_SERVER_SVC.format(port=port,
- param_server_id=param_server)
+ if request_load_balancer:
+ config += PARAM_LB_SVC.format(port=port, param_server_id=param_server)
+ else:
+ config += PARAM_SERVER_SVC.format(port=port, param_server_id=param_server)
config += '---\n'
return config