diff options
author | 2018-07-16 14:38:44 -0700 | |
---|---|---|
committer | 2018-07-16 14:43:01 -0700 | |
commit | f8f0d7f000349ab573f0d912c37ebc3675cc6154 (patch) | |
tree | 9ce4b9e226326fab62eb69bc23ac710c076aaefb /tensorflow/python/estimator/canned | |
parent | 33af29b33f14cc74725ff081d50e6e59247ef546 (diff) |
Refactoring some of the boosted trees code for growing ensemble.
PiperOrigin-RevId: 204809484
Diffstat (limited to 'tensorflow/python/estimator/canned')
-rw-r--r-- | tensorflow/python/estimator/canned/boosted_trees.py | 439 |
1 files changed, 269 insertions, 170 deletions
diff --git a/tensorflow/python/estimator/canned/boosted_trees.py b/tensorflow/python/estimator/canned/boosted_trees.py index 3c832c7569..3292e2724d 100644 --- a/tensorflow/python/estimator/canned/boosted_trees.py +++ b/tensorflow/python/estimator/canned/boosted_trees.py @@ -17,6 +17,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import abc import collections import functools @@ -384,6 +385,249 @@ class _StopAtAttemptsHook(session_run_hook.SessionRunHook): run_context.request_stop() +def _get_max_splits(tree_hparams): + """Calculates the max possible number of splits based on tree params.""" + # maximum number of splits possible in the whole tree =2^(D-1)-1 + max_splits = (1 << tree_hparams.max_depth) - 1 + return max_splits + + +class _EnsembleGrower(object): + """Abstract base class for different types of ensemble growers. + + Use it to receive training ops for growing and centering bias, depending + on the implementation (for example, in memory or accumulator-based + distributed): + grower = ...create subclass grower(tree_ensemble, tree_hparams) + grow_op = grower.grow_tree(stats_summaries_list, feature_ids_list, + last_layer_nodes_range) + training_ops.append(grow_op) + """ + + def __init__(self, tree_ensemble, tree_hparams): + """Initializes a grower object. + + Args: + tree_ensemble: A TreeEnsemble variable. + tree_hparams: TODO. collections.namedtuple for hyper parameters. + """ + self._tree_ensemble = tree_ensemble + self._tree_hparams = tree_hparams + + @abc.abstractmethod + def center_bias(self, center_bias_var, gradients, hessians): + """Centers bias, if ready, based on statistics. + + Args: + center_bias_var: A variable that will be updated when bias centering + finished. + gradients: A rank 2 tensor of gradients. + hessians: A rank 2 tensor of hessians. + + Returns: + An operation for centering bias. + """ + + @abc.abstractmethod + def grow_tree(self, stats_summaries_list, feature_ids_list, + last_layer_nodes_range): + """Grows a tree, if ready, based on provided statistics. + + Args: + stats_summaries_list: List of stats summary tensors, representing sums of + gradients and hessians for each feature bucket. + feature_ids_list: a list of lists of feature ids for each bucket size. + last_layer_nodes_range: A tensor representing ids of the nodes in the + current layer, to be split. + + Returns: + An op for growing a tree. + """ + + # ============= Helper methods =========== + + def _center_bias_fn(self, center_bias_var, mean_gradients, mean_hessians): + """Updates the ensembles and cache (if needed) with logits prior.""" + continue_centering = boosted_trees_ops.center_bias( + self._tree_ensemble.resource_handle, + mean_gradients=mean_gradients, + mean_hessians=mean_hessians, + l1=self._tree_hparams.l1, + l2=self._tree_hparams.l2) + return center_bias_var.assign(continue_centering) + + def _grow_tree_from_stats_summaries(self, stats_summaries_list, + feature_ids_list, last_layer_nodes_range): + """Updates ensemble based on the best gains from stats summaries.""" + node_ids_per_feature = [] + gains_list = [] + thresholds_list = [] + left_node_contribs_list = [] + right_node_contribs_list = [] + all_feature_ids = [] + assert len(stats_summaries_list) == len(feature_ids_list) + + max_splits = _get_max_splits(self._tree_hparams) + + for i, feature_ids in enumerate(feature_ids_list): + (numeric_node_ids_per_feature, numeric_gains_list, + numeric_thresholds_list, numeric_left_node_contribs_list, + numeric_right_node_contribs_list) = ( + boosted_trees_ops.calculate_best_gains_per_feature( + node_id_range=last_layer_nodes_range, + stats_summary_list=stats_summaries_list[i], + l1=self._tree_hparams.l1, + l2=self._tree_hparams.l2, + tree_complexity=self._tree_hparams.tree_complexity, + min_node_weight=self._tree_hparams.min_node_weight, + max_splits=max_splits)) + + all_feature_ids += feature_ids + node_ids_per_feature += numeric_node_ids_per_feature + gains_list += numeric_gains_list + thresholds_list += numeric_thresholds_list + left_node_contribs_list += numeric_left_node_contribs_list + right_node_contribs_list += numeric_right_node_contribs_list + + grow_op = boosted_trees_ops.update_ensemble( + # Confirm if local_tree_ensemble or tree_ensemble should be used. + self._tree_ensemble.resource_handle, + feature_ids=all_feature_ids, + node_ids=node_ids_per_feature, + gains=gains_list, + thresholds=thresholds_list, + left_node_contribs=left_node_contribs_list, + right_node_contribs=right_node_contribs_list, + learning_rate=self._tree_hparams.learning_rate, + max_depth=self._tree_hparams.max_depth, + pruning_mode=boosted_trees_ops.PruningMode.NO_PRUNING) + return grow_op + + +class _InMemoryEnsembleGrower(_EnsembleGrower): + """A base class for ensemble growers.""" + + def __init__(self, tree_ensemble, tree_hparams): + + super(_InMemoryEnsembleGrower, self).__init__( + tree_ensemble=tree_ensemble, tree_hparams=tree_hparams) + + def center_bias(self, center_bias_var, gradients, hessians): + # For in memory, we already have a full batch of gradients and hessians, + # so just take a mean and proceed with centering. + mean_gradients = array_ops.expand_dims( + math_ops.reduce_mean(gradients, 0), 0) + mean_heassians = array_ops.expand_dims(math_ops.reduce_mean(hessians, 0), 0) + return self._center_bias_fn(center_bias_var, mean_gradients, mean_heassians) + + def grow_tree(self, stats_summaries_list, feature_ids_list, + last_layer_nodes_range): + # For in memory, we already have full data in one batch, so we can grow the + # tree immediately. + return self._grow_tree_from_stats_summaries( + stats_summaries_list, feature_ids_list, last_layer_nodes_range) + + +class _AccumulatorEnsembleGrower(_EnsembleGrower): + """A base class for ensemble growers.""" + + def __init__(self, tree_ensemble, tree_hparams, stamp_token, + n_batches_per_layer, bucket_size_list, is_chief): + super(_AccumulatorEnsembleGrower, self).__init__( + tree_ensemble=tree_ensemble, tree_hparams=tree_hparams) + self._stamp_token = stamp_token + self._n_batches_per_layer = n_batches_per_layer + self._bucket_size_list = bucket_size_list + self._is_chief = is_chief + + def center_bias(self, center_bias_var, gradients, hessians): + # For not in memory situation, we need to accumulate enough of batches first + # before proceeding with centering bias. + + # Create an accumulator. + bias_dependencies = [] + bias_accumulator = data_flow_ops.ConditionalAccumulator( + dtype=dtypes.float32, + # The stats consist of grads and hessians means only. + # TODO(nponomareva): this will change for a multiclass + shape=[2, 1], + shared_name='bias_accumulator') + + grads_and_hess = array_ops.stack([gradients, hessians], axis=0) + grads_and_hess = math_ops.reduce_mean(grads_and_hess, axis=1) + + apply_grad = bias_accumulator.apply_grad(grads_and_hess, self._stamp_token) + bias_dependencies.append(apply_grad) + + # Center bias if enough batches were processed. + with ops.control_dependencies(bias_dependencies): + if not self._is_chief: + return control_flow_ops.no_op() + + def center_bias_from_accumulator(): + accumulated = array_ops.unstack(bias_accumulator.take_grad(1), axis=0) + return self._center_bias_fn(center_bias_var, + array_ops.expand_dims(accumulated[0], 0), + array_ops.expand_dims(accumulated[1], 0)) + + center_bias_op = control_flow_ops.cond( + math_ops.greater_equal(bias_accumulator.num_accumulated(), + self._n_batches_per_layer), + center_bias_from_accumulator, + control_flow_ops.no_op, + name='wait_until_n_batches_for_bias_accumulated') + return center_bias_op + + def grow_tree(self, stats_summaries_list, feature_ids_list, + last_layer_nodes_range): + # For not in memory situation, we need to accumulate enough of batches first + # before proceeding with building a tree layer. + max_splits = _get_max_splits(self._tree_hparams) + + # Prepare accumulators. + accumulators = [] + dependencies = [] + for i, feature_ids in enumerate(feature_ids_list): + stats_summaries = stats_summaries_list[i] + accumulator = data_flow_ops.ConditionalAccumulator( + dtype=dtypes.float32, + # The stats consist of grads and hessians (the last dimension). + shape=[len(feature_ids), max_splits, self._bucket_size_list[i], 2], + shared_name='numeric_stats_summary_accumulator_' + str(i)) + accumulators.append(accumulator) + + apply_grad = accumulator.apply_grad( + array_ops.stack(stats_summaries, axis=0), self._stamp_token) + dependencies.append(apply_grad) + + # Grow the tree if enough batches is accumulated. + with ops.control_dependencies(dependencies): + if not self._is_chief: + return control_flow_ops.no_op() + + min_accumulated = math_ops.reduce_min( + array_ops.stack([acc.num_accumulated() for acc in accumulators])) + + def grow_tree_from_accumulated_summaries_fn(): + """Updates tree with the best layer from accumulated summaries.""" + # Take out the accumulated summaries from the accumulator and grow. + stats_summaries_list = [] + stats_summaries_list = [ + array_ops.unstack(accumulator.take_grad(1), axis=0) + for accumulator in accumulators + ] + grow_op = self._grow_tree_from_stats_summaries( + stats_summaries_list, feature_ids_list, last_layer_nodes_range) + return grow_op + + grow_model = control_flow_ops.cond( + math_ops.greater_equal(min_accumulated, self._n_batches_per_layer), + grow_tree_from_accumulated_summaries_fn, + control_flow_ops.no_op, + name='wait_until_n_batches_accumulated') + return grow_model + + def _bt_model_fn( features, labels, @@ -441,11 +685,6 @@ def _bt_model_fn( raise ValueError('train_in_memory is supported only for ' 'non-distributed training.') worker_device = control_flow_ops.no_op().device - # maximum number of splits possible in the whole tree =2^(D-1)-1 - # TODO(youngheek): perhaps storage could be optimized by storing stats with - # the dimension max_splits_per_layer, instead of max_splits (for the entire - # tree). - max_splits = (1 << tree_hparams.max_depth) - 1 train_op = [] with ops.name_scope(name) as name: # Prepare. @@ -543,6 +782,11 @@ def _bt_model_fn( hessians = gradients_impl.gradients( gradients, logits, name='Hessians')[0] + # TODO(youngheek): perhaps storage could be optimized by storing stats + # with the dimension max_splits_per_layer, instead of max_splits (for the + # entire tree). + max_splits = _get_max_splits(tree_hparams) + stats_summaries_list = [] for i, feature_ids in enumerate(feature_ids_list): num_buckets = bucket_size_list[i] @@ -559,173 +803,28 @@ def _bt_model_fn( ] stats_summaries_list.append(summaries) - # ========= Helper methods for both in and not in memory. ============== - def grow_tree_from_stats_summaries(stats_summaries_list, - feature_ids_list): - """Updates ensemble based on the best gains from stats summaries.""" - node_ids_per_feature = [] - gains_list = [] - thresholds_list = [] - left_node_contribs_list = [] - right_node_contribs_list = [] - all_feature_ids = [] - - assert len(stats_summaries_list) == len(feature_ids_list) - - for i, feature_ids in enumerate(feature_ids_list): - (numeric_node_ids_per_feature, numeric_gains_list, - numeric_thresholds_list, numeric_left_node_contribs_list, - numeric_right_node_contribs_list) = ( - boosted_trees_ops.calculate_best_gains_per_feature( - node_id_range=last_layer_nodes_range, - stats_summary_list=stats_summaries_list[i], - l1=tree_hparams.l1, - l2=tree_hparams.l2, - tree_complexity=tree_hparams.tree_complexity, - min_node_weight=tree_hparams.min_node_weight, - max_splits=max_splits)) - - all_feature_ids += feature_ids - node_ids_per_feature += numeric_node_ids_per_feature - gains_list += numeric_gains_list - thresholds_list += numeric_thresholds_list - left_node_contribs_list += numeric_left_node_contribs_list - right_node_contribs_list += numeric_right_node_contribs_list - - grow_op = boosted_trees_ops.update_ensemble( - # Confirm if local_tree_ensemble or tree_ensemble should be used. - tree_ensemble.resource_handle, - feature_ids=all_feature_ids, - node_ids=node_ids_per_feature, - gains=gains_list, - thresholds=thresholds_list, - left_node_contribs=left_node_contribs_list, - right_node_contribs=right_node_contribs_list, - learning_rate=tree_hparams.learning_rate, - max_depth=tree_hparams.max_depth, - pruning_mode=boosted_trees_ops.PruningMode.NO_PRUNING) - return grow_op - - def _center_bias_fn(mean_gradients, mean_hessians): - """Updates the ensembles and cache (if needed) with logits prior.""" - continue_centering = boosted_trees_ops.center_bias( - tree_ensemble.resource_handle, - mean_gradients=mean_gradients, - mean_hessians=mean_hessians, - l1=tree_hparams.l1, - l2=tree_hparams.l2 - ) - return center_bias_var.assign(continue_centering) - - # ========= End of helper methods. ============== - if train_in_memory and is_single_machine: - train_op.append(distribute_lib.increment_var(global_step)) - - mean_gradients = array_ops.expand_dims( - math_ops.reduce_mean(gradients, 0), 0) - mean_heassians = array_ops.expand_dims( - math_ops.reduce_mean(hessians, 0), 0) - - train_op.append( - control_flow_ops.cond( - center_bias_var, - lambda: _center_bias_fn(mean_gradients, mean_heassians), - functools.partial(grow_tree_from_stats_summaries, - stats_summaries_list, feature_ids_list))) + grower = _InMemoryEnsembleGrower(tree_ensemble, tree_hparams) else: - - def center_bias_not_in_mem(): - """Accumulates the data and updates the logits bias, when ready.""" - bias_dependencies = [] - - bias_accumulator = data_flow_ops.ConditionalAccumulator( - dtype=dtypes.float32, - # The stats consist of grads and hessians means only. - # TODO(nponomareva): this will change for a multiclass - shape=[2, 1], - shared_name='bias_accumulator') - - grads_and_hess = array_ops.stack([gradients, hessians], axis=0) - grads_and_hess = math_ops.reduce_mean(grads_and_hess, axis=1) - - apply_grad = bias_accumulator.apply_grad(grads_and_hess, stamp_token) - bias_dependencies.append(apply_grad) - - def center_bias_from_accumulator(): - accumulated = array_ops.unstack( - bias_accumulator.take_grad(1), axis=0) - return _center_bias_fn( - array_ops.expand_dims(accumulated[0], 0), - array_ops.expand_dims(accumulated[1], 0)) - - with ops.control_dependencies(bias_dependencies): - if config.is_chief: - center_bias_op = control_flow_ops.cond( - math_ops.greater_equal(bias_accumulator.num_accumulated(), - n_batches_per_layer), - center_bias_from_accumulator, - control_flow_ops.no_op, - name='wait_until_n_batches_for_bias_accumulated') - - return center_bias_op - else: - return control_flow_ops.no_op() - - def grow_not_in_mem(): - """Accumulates the data and grows a layer when ready.""" - - accumulators = [] - dependencies = [] - for i, feature_ids in enumerate(feature_ids_list): - stats_summaries = stats_summaries_list[i] - accumulator = data_flow_ops.ConditionalAccumulator( - dtype=dtypes.float32, - # The stats consist of grads and hessians (the last dimension). - shape=[len(feature_ids), max_splits, bucket_size_list[i], 2], - shared_name='numeric_stats_summary_accumulator_' + str(i)) - accumulators.append(accumulator) - - apply_grad = accumulator.apply_grad( - array_ops.stack(stats_summaries, axis=0), stamp_token) - dependencies.append(apply_grad) - - def grow_tree_from_accumulated_summaries_fn(): - """Updates tree with the best layer from accumulated summaries.""" - # Take out the accumulated summaries from the accumulator and grow. - stats_summaries_list = [] - - stats_summaries_list = [ - array_ops.unstack(accumulator.take_grad(1), axis=0) - for accumulator in accumulators - ] - - grow_op = grow_tree_from_stats_summaries(stats_summaries_list, - feature_ids_list) - return grow_op - - with ops.control_dependencies(dependencies): - if config.is_chief: - min_accumulated = math_ops.reduce_min( - array_ops.stack( - [acc.num_accumulated() for acc in accumulators])) - - grow_model = control_flow_ops.cond( - math_ops.greater_equal(min_accumulated, n_batches_per_layer), - grow_tree_from_accumulated_summaries_fn, - control_flow_ops.no_op, - name='wait_until_n_batches_accumulated') - - return grow_model - else: - return control_flow_ops.no_op() - - update_model = control_flow_ops.cond( - center_bias_var, center_bias_not_in_mem, grow_not_in_mem) - train_op.append(update_model) - with ops.control_dependencies([update_model]): - increment_global = distribute_lib.increment_var(global_step) - train_op.append(increment_global) + grower = _AccumulatorEnsembleGrower(tree_ensemble, tree_hparams, + stamp_token, n_batches_per_layer, + bucket_size_list, config.is_chief) + + update_model = control_flow_ops.cond( + center_bias_var, + functools.partial( + grower.center_bias, + center_bias_var, + gradients, + hessians, + ), + functools.partial(grower.grow_tree, stats_summaries_list, + feature_ids_list, last_layer_nodes_range)) + train_op.append(update_model) + + with ops.control_dependencies([update_model]): + increment_global = distribute_lib.increment_var(global_step) + train_op.append(increment_global) return control_flow_ops.group(train_op, name='train_op') |