aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/python/estimator/canned/boosted_trees.py
diff options
context:
space:
mode:
Diffstat (limited to 'tensorflow/python/estimator/canned/boosted_trees.py')
-rw-r--r--tensorflow/python/estimator/canned/boosted_trees.py439
1 files changed, 269 insertions, 170 deletions
diff --git a/tensorflow/python/estimator/canned/boosted_trees.py b/tensorflow/python/estimator/canned/boosted_trees.py
index 3c832c7569..3292e2724d 100644
--- a/tensorflow/python/estimator/canned/boosted_trees.py
+++ b/tensorflow/python/estimator/canned/boosted_trees.py
@@ -17,6 +17,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
+import abc
import collections
import functools
@@ -384,6 +385,249 @@ class _StopAtAttemptsHook(session_run_hook.SessionRunHook):
run_context.request_stop()
+def _get_max_splits(tree_hparams):
+ """Calculates the max possible number of splits based on tree params."""
+ # maximum number of splits possible in the whole tree =2^(D-1)-1
+ max_splits = (1 << tree_hparams.max_depth) - 1
+ return max_splits
+
+
+class _EnsembleGrower(object):
+ """Abstract base class for different types of ensemble growers.
+
+ Use it to receive training ops for growing and centering bias, depending
+ on the implementation (for example, in memory or accumulator-based
+ distributed):
+ grower = ...create subclass grower(tree_ensemble, tree_hparams)
+ grow_op = grower.grow_tree(stats_summaries_list, feature_ids_list,
+ last_layer_nodes_range)
+ training_ops.append(grow_op)
+ """
+
+ def __init__(self, tree_ensemble, tree_hparams):
+ """Initializes a grower object.
+
+ Args:
+ tree_ensemble: A TreeEnsemble variable.
+ tree_hparams: TODO. collections.namedtuple for hyper parameters.
+ """
+ self._tree_ensemble = tree_ensemble
+ self._tree_hparams = tree_hparams
+
+ @abc.abstractmethod
+ def center_bias(self, center_bias_var, gradients, hessians):
+ """Centers bias, if ready, based on statistics.
+
+ Args:
+ center_bias_var: A variable that will be updated when bias centering
+ finished.
+ gradients: A rank 2 tensor of gradients.
+ hessians: A rank 2 tensor of hessians.
+
+ Returns:
+ An operation for centering bias.
+ """
+
+ @abc.abstractmethod
+ def grow_tree(self, stats_summaries_list, feature_ids_list,
+ last_layer_nodes_range):
+ """Grows a tree, if ready, based on provided statistics.
+
+ Args:
+ stats_summaries_list: List of stats summary tensors, representing sums of
+ gradients and hessians for each feature bucket.
+ feature_ids_list: a list of lists of feature ids for each bucket size.
+ last_layer_nodes_range: A tensor representing ids of the nodes in the
+ current layer, to be split.
+
+ Returns:
+ An op for growing a tree.
+ """
+
+ # ============= Helper methods ===========
+
+ def _center_bias_fn(self, center_bias_var, mean_gradients, mean_hessians):
+ """Updates the ensembles and cache (if needed) with logits prior."""
+ continue_centering = boosted_trees_ops.center_bias(
+ self._tree_ensemble.resource_handle,
+ mean_gradients=mean_gradients,
+ mean_hessians=mean_hessians,
+ l1=self._tree_hparams.l1,
+ l2=self._tree_hparams.l2)
+ return center_bias_var.assign(continue_centering)
+
+ def _grow_tree_from_stats_summaries(self, stats_summaries_list,
+ feature_ids_list, last_layer_nodes_range):
+ """Updates ensemble based on the best gains from stats summaries."""
+ node_ids_per_feature = []
+ gains_list = []
+ thresholds_list = []
+ left_node_contribs_list = []
+ right_node_contribs_list = []
+ all_feature_ids = []
+ assert len(stats_summaries_list) == len(feature_ids_list)
+
+ max_splits = _get_max_splits(self._tree_hparams)
+
+ for i, feature_ids in enumerate(feature_ids_list):
+ (numeric_node_ids_per_feature, numeric_gains_list,
+ numeric_thresholds_list, numeric_left_node_contribs_list,
+ numeric_right_node_contribs_list) = (
+ boosted_trees_ops.calculate_best_gains_per_feature(
+ node_id_range=last_layer_nodes_range,
+ stats_summary_list=stats_summaries_list[i],
+ l1=self._tree_hparams.l1,
+ l2=self._tree_hparams.l2,
+ tree_complexity=self._tree_hparams.tree_complexity,
+ min_node_weight=self._tree_hparams.min_node_weight,
+ max_splits=max_splits))
+
+ all_feature_ids += feature_ids
+ node_ids_per_feature += numeric_node_ids_per_feature
+ gains_list += numeric_gains_list
+ thresholds_list += numeric_thresholds_list
+ left_node_contribs_list += numeric_left_node_contribs_list
+ right_node_contribs_list += numeric_right_node_contribs_list
+
+ grow_op = boosted_trees_ops.update_ensemble(
+ # Confirm if local_tree_ensemble or tree_ensemble should be used.
+ self._tree_ensemble.resource_handle,
+ feature_ids=all_feature_ids,
+ node_ids=node_ids_per_feature,
+ gains=gains_list,
+ thresholds=thresholds_list,
+ left_node_contribs=left_node_contribs_list,
+ right_node_contribs=right_node_contribs_list,
+ learning_rate=self._tree_hparams.learning_rate,
+ max_depth=self._tree_hparams.max_depth,
+ pruning_mode=boosted_trees_ops.PruningMode.NO_PRUNING)
+ return grow_op
+
+
+class _InMemoryEnsembleGrower(_EnsembleGrower):
+ """A base class for ensemble growers."""
+
+ def __init__(self, tree_ensemble, tree_hparams):
+
+ super(_InMemoryEnsembleGrower, self).__init__(
+ tree_ensemble=tree_ensemble, tree_hparams=tree_hparams)
+
+ def center_bias(self, center_bias_var, gradients, hessians):
+ # For in memory, we already have a full batch of gradients and hessians,
+ # so just take a mean and proceed with centering.
+ mean_gradients = array_ops.expand_dims(
+ math_ops.reduce_mean(gradients, 0), 0)
+ mean_heassians = array_ops.expand_dims(math_ops.reduce_mean(hessians, 0), 0)
+ return self._center_bias_fn(center_bias_var, mean_gradients, mean_heassians)
+
+ def grow_tree(self, stats_summaries_list, feature_ids_list,
+ last_layer_nodes_range):
+ # For in memory, we already have full data in one batch, so we can grow the
+ # tree immediately.
+ return self._grow_tree_from_stats_summaries(
+ stats_summaries_list, feature_ids_list, last_layer_nodes_range)
+
+
+class _AccumulatorEnsembleGrower(_EnsembleGrower):
+ """A base class for ensemble growers."""
+
+ def __init__(self, tree_ensemble, tree_hparams, stamp_token,
+ n_batches_per_layer, bucket_size_list, is_chief):
+ super(_AccumulatorEnsembleGrower, self).__init__(
+ tree_ensemble=tree_ensemble, tree_hparams=tree_hparams)
+ self._stamp_token = stamp_token
+ self._n_batches_per_layer = n_batches_per_layer
+ self._bucket_size_list = bucket_size_list
+ self._is_chief = is_chief
+
+ def center_bias(self, center_bias_var, gradients, hessians):
+ # For not in memory situation, we need to accumulate enough of batches first
+ # before proceeding with centering bias.
+
+ # Create an accumulator.
+ bias_dependencies = []
+ bias_accumulator = data_flow_ops.ConditionalAccumulator(
+ dtype=dtypes.float32,
+ # The stats consist of grads and hessians means only.
+ # TODO(nponomareva): this will change for a multiclass
+ shape=[2, 1],
+ shared_name='bias_accumulator')
+
+ grads_and_hess = array_ops.stack([gradients, hessians], axis=0)
+ grads_and_hess = math_ops.reduce_mean(grads_and_hess, axis=1)
+
+ apply_grad = bias_accumulator.apply_grad(grads_and_hess, self._stamp_token)
+ bias_dependencies.append(apply_grad)
+
+ # Center bias if enough batches were processed.
+ with ops.control_dependencies(bias_dependencies):
+ if not self._is_chief:
+ return control_flow_ops.no_op()
+
+ def center_bias_from_accumulator():
+ accumulated = array_ops.unstack(bias_accumulator.take_grad(1), axis=0)
+ return self._center_bias_fn(center_bias_var,
+ array_ops.expand_dims(accumulated[0], 0),
+ array_ops.expand_dims(accumulated[1], 0))
+
+ center_bias_op = control_flow_ops.cond(
+ math_ops.greater_equal(bias_accumulator.num_accumulated(),
+ self._n_batches_per_layer),
+ center_bias_from_accumulator,
+ control_flow_ops.no_op,
+ name='wait_until_n_batches_for_bias_accumulated')
+ return center_bias_op
+
+ def grow_tree(self, stats_summaries_list, feature_ids_list,
+ last_layer_nodes_range):
+ # For not in memory situation, we need to accumulate enough of batches first
+ # before proceeding with building a tree layer.
+ max_splits = _get_max_splits(self._tree_hparams)
+
+ # Prepare accumulators.
+ accumulators = []
+ dependencies = []
+ for i, feature_ids in enumerate(feature_ids_list):
+ stats_summaries = stats_summaries_list[i]
+ accumulator = data_flow_ops.ConditionalAccumulator(
+ dtype=dtypes.float32,
+ # The stats consist of grads and hessians (the last dimension).
+ shape=[len(feature_ids), max_splits, self._bucket_size_list[i], 2],
+ shared_name='numeric_stats_summary_accumulator_' + str(i))
+ accumulators.append(accumulator)
+
+ apply_grad = accumulator.apply_grad(
+ array_ops.stack(stats_summaries, axis=0), self._stamp_token)
+ dependencies.append(apply_grad)
+
+ # Grow the tree if enough batches is accumulated.
+ with ops.control_dependencies(dependencies):
+ if not self._is_chief:
+ return control_flow_ops.no_op()
+
+ min_accumulated = math_ops.reduce_min(
+ array_ops.stack([acc.num_accumulated() for acc in accumulators]))
+
+ def grow_tree_from_accumulated_summaries_fn():
+ """Updates tree with the best layer from accumulated summaries."""
+ # Take out the accumulated summaries from the accumulator and grow.
+ stats_summaries_list = []
+ stats_summaries_list = [
+ array_ops.unstack(accumulator.take_grad(1), axis=0)
+ for accumulator in accumulators
+ ]
+ grow_op = self._grow_tree_from_stats_summaries(
+ stats_summaries_list, feature_ids_list, last_layer_nodes_range)
+ return grow_op
+
+ grow_model = control_flow_ops.cond(
+ math_ops.greater_equal(min_accumulated, self._n_batches_per_layer),
+ grow_tree_from_accumulated_summaries_fn,
+ control_flow_ops.no_op,
+ name='wait_until_n_batches_accumulated')
+ return grow_model
+
+
def _bt_model_fn(
features,
labels,
@@ -441,11 +685,6 @@ def _bt_model_fn(
raise ValueError('train_in_memory is supported only for '
'non-distributed training.')
worker_device = control_flow_ops.no_op().device
- # maximum number of splits possible in the whole tree =2^(D-1)-1
- # TODO(youngheek): perhaps storage could be optimized by storing stats with
- # the dimension max_splits_per_layer, instead of max_splits (for the entire
- # tree).
- max_splits = (1 << tree_hparams.max_depth) - 1
train_op = []
with ops.name_scope(name) as name:
# Prepare.
@@ -543,6 +782,11 @@ def _bt_model_fn(
hessians = gradients_impl.gradients(
gradients, logits, name='Hessians')[0]
+ # TODO(youngheek): perhaps storage could be optimized by storing stats
+ # with the dimension max_splits_per_layer, instead of max_splits (for the
+ # entire tree).
+ max_splits = _get_max_splits(tree_hparams)
+
stats_summaries_list = []
for i, feature_ids in enumerate(feature_ids_list):
num_buckets = bucket_size_list[i]
@@ -559,173 +803,28 @@ def _bt_model_fn(
]
stats_summaries_list.append(summaries)
- # ========= Helper methods for both in and not in memory. ==============
- def grow_tree_from_stats_summaries(stats_summaries_list,
- feature_ids_list):
- """Updates ensemble based on the best gains from stats summaries."""
- node_ids_per_feature = []
- gains_list = []
- thresholds_list = []
- left_node_contribs_list = []
- right_node_contribs_list = []
- all_feature_ids = []
-
- assert len(stats_summaries_list) == len(feature_ids_list)
-
- for i, feature_ids in enumerate(feature_ids_list):
- (numeric_node_ids_per_feature, numeric_gains_list,
- numeric_thresholds_list, numeric_left_node_contribs_list,
- numeric_right_node_contribs_list) = (
- boosted_trees_ops.calculate_best_gains_per_feature(
- node_id_range=last_layer_nodes_range,
- stats_summary_list=stats_summaries_list[i],
- l1=tree_hparams.l1,
- l2=tree_hparams.l2,
- tree_complexity=tree_hparams.tree_complexity,
- min_node_weight=tree_hparams.min_node_weight,
- max_splits=max_splits))
-
- all_feature_ids += feature_ids
- node_ids_per_feature += numeric_node_ids_per_feature
- gains_list += numeric_gains_list
- thresholds_list += numeric_thresholds_list
- left_node_contribs_list += numeric_left_node_contribs_list
- right_node_contribs_list += numeric_right_node_contribs_list
-
- grow_op = boosted_trees_ops.update_ensemble(
- # Confirm if local_tree_ensemble or tree_ensemble should be used.
- tree_ensemble.resource_handle,
- feature_ids=all_feature_ids,
- node_ids=node_ids_per_feature,
- gains=gains_list,
- thresholds=thresholds_list,
- left_node_contribs=left_node_contribs_list,
- right_node_contribs=right_node_contribs_list,
- learning_rate=tree_hparams.learning_rate,
- max_depth=tree_hparams.max_depth,
- pruning_mode=boosted_trees_ops.PruningMode.NO_PRUNING)
- return grow_op
-
- def _center_bias_fn(mean_gradients, mean_hessians):
- """Updates the ensembles and cache (if needed) with logits prior."""
- continue_centering = boosted_trees_ops.center_bias(
- tree_ensemble.resource_handle,
- mean_gradients=mean_gradients,
- mean_hessians=mean_hessians,
- l1=tree_hparams.l1,
- l2=tree_hparams.l2
- )
- return center_bias_var.assign(continue_centering)
-
- # ========= End of helper methods. ==============
-
if train_in_memory and is_single_machine:
- train_op.append(distribute_lib.increment_var(global_step))
-
- mean_gradients = array_ops.expand_dims(
- math_ops.reduce_mean(gradients, 0), 0)
- mean_heassians = array_ops.expand_dims(
- math_ops.reduce_mean(hessians, 0), 0)
-
- train_op.append(
- control_flow_ops.cond(
- center_bias_var,
- lambda: _center_bias_fn(mean_gradients, mean_heassians),
- functools.partial(grow_tree_from_stats_summaries,
- stats_summaries_list, feature_ids_list)))
+ grower = _InMemoryEnsembleGrower(tree_ensemble, tree_hparams)
else:
-
- def center_bias_not_in_mem():
- """Accumulates the data and updates the logits bias, when ready."""
- bias_dependencies = []
-
- bias_accumulator = data_flow_ops.ConditionalAccumulator(
- dtype=dtypes.float32,
- # The stats consist of grads and hessians means only.
- # TODO(nponomareva): this will change for a multiclass
- shape=[2, 1],
- shared_name='bias_accumulator')
-
- grads_and_hess = array_ops.stack([gradients, hessians], axis=0)
- grads_and_hess = math_ops.reduce_mean(grads_and_hess, axis=1)
-
- apply_grad = bias_accumulator.apply_grad(grads_and_hess, stamp_token)
- bias_dependencies.append(apply_grad)
-
- def center_bias_from_accumulator():
- accumulated = array_ops.unstack(
- bias_accumulator.take_grad(1), axis=0)
- return _center_bias_fn(
- array_ops.expand_dims(accumulated[0], 0),
- array_ops.expand_dims(accumulated[1], 0))
-
- with ops.control_dependencies(bias_dependencies):
- if config.is_chief:
- center_bias_op = control_flow_ops.cond(
- math_ops.greater_equal(bias_accumulator.num_accumulated(),
- n_batches_per_layer),
- center_bias_from_accumulator,
- control_flow_ops.no_op,
- name='wait_until_n_batches_for_bias_accumulated')
-
- return center_bias_op
- else:
- return control_flow_ops.no_op()
-
- def grow_not_in_mem():
- """Accumulates the data and grows a layer when ready."""
-
- accumulators = []
- dependencies = []
- for i, feature_ids in enumerate(feature_ids_list):
- stats_summaries = stats_summaries_list[i]
- accumulator = data_flow_ops.ConditionalAccumulator(
- dtype=dtypes.float32,
- # The stats consist of grads and hessians (the last dimension).
- shape=[len(feature_ids), max_splits, bucket_size_list[i], 2],
- shared_name='numeric_stats_summary_accumulator_' + str(i))
- accumulators.append(accumulator)
-
- apply_grad = accumulator.apply_grad(
- array_ops.stack(stats_summaries, axis=0), stamp_token)
- dependencies.append(apply_grad)
-
- def grow_tree_from_accumulated_summaries_fn():
- """Updates tree with the best layer from accumulated summaries."""
- # Take out the accumulated summaries from the accumulator and grow.
- stats_summaries_list = []
-
- stats_summaries_list = [
- array_ops.unstack(accumulator.take_grad(1), axis=0)
- for accumulator in accumulators
- ]
-
- grow_op = grow_tree_from_stats_summaries(stats_summaries_list,
- feature_ids_list)
- return grow_op
-
- with ops.control_dependencies(dependencies):
- if config.is_chief:
- min_accumulated = math_ops.reduce_min(
- array_ops.stack(
- [acc.num_accumulated() for acc in accumulators]))
-
- grow_model = control_flow_ops.cond(
- math_ops.greater_equal(min_accumulated, n_batches_per_layer),
- grow_tree_from_accumulated_summaries_fn,
- control_flow_ops.no_op,
- name='wait_until_n_batches_accumulated')
-
- return grow_model
- else:
- return control_flow_ops.no_op()
-
- update_model = control_flow_ops.cond(
- center_bias_var, center_bias_not_in_mem, grow_not_in_mem)
- train_op.append(update_model)
- with ops.control_dependencies([update_model]):
- increment_global = distribute_lib.increment_var(global_step)
- train_op.append(increment_global)
+ grower = _AccumulatorEnsembleGrower(tree_ensemble, tree_hparams,
+ stamp_token, n_batches_per_layer,
+ bucket_size_list, config.is_chief)
+
+ update_model = control_flow_ops.cond(
+ center_bias_var,
+ functools.partial(
+ grower.center_bias,
+ center_bias_var,
+ gradients,
+ hessians,
+ ),
+ functools.partial(grower.grow_tree, stats_summaries_list,
+ feature_ids_list, last_layer_nodes_range))
+ train_op.append(update_model)
+
+ with ops.control_dependencies([update_model]):
+ increment_global = distribute_lib.increment_var(global_step)
+ train_op.append(increment_global)
return control_flow_ops.group(train_op, name='train_op')