1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
|
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Estimators for time series models."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import functools
from tensorflow.contrib.timeseries.python.timeseries import ar_model
from tensorflow.contrib.timeseries.python.timeseries import feature_keys
from tensorflow.contrib.timeseries.python.timeseries import head as ts_head_lib
from tensorflow.contrib.timeseries.python.timeseries import math_utils
from tensorflow.contrib.timeseries.python.timeseries import state_management
from tensorflow.contrib.timeseries.python.timeseries.state_space_models import state_space_model
from tensorflow.contrib.timeseries.python.timeseries.state_space_models import structural_ensemble
from tensorflow.contrib.timeseries.python.timeseries.state_space_models.filtering_postprocessor import StateInterpolatingAnomalyDetector
from tensorflow.python.estimator import estimator_lib
from tensorflow.python.estimator.export import export_lib
from tensorflow.python.feature_column import feature_column
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.framework import tensor_shape
from tensorflow.python.framework import tensor_util
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import parsing_ops
from tensorflow.python.training import training as train
from tensorflow.python.util import nest
class TimeSeriesRegressor(estimator_lib.Estimator):
"""An Estimator to fit and evaluate a time series model."""
def __init__(self, model, state_manager=None, optimizer=None, model_dir=None,
config=None, head_type=ts_head_lib.TimeSeriesRegressionHead):
"""Initialize the Estimator.
Args:
model: The time series model to wrap (inheriting from TimeSeriesModel).
state_manager: The state manager to use, or (by default)
PassthroughStateManager if none is needed.
optimizer: The optimization algorithm to use when training, inheriting
from tf.train.Optimizer. Defaults to Adam with step size 0.02.
model_dir: See `Estimator`.
config: See `Estimator`.
head_type: The kind of head to use for the model (inheriting from
`TimeSeriesRegressionHead`).
"""
input_statistics_generator = math_utils.InputStatisticsFromMiniBatch(
dtype=model.dtype, num_features=model.num_features)
if state_manager is None:
if isinstance(model, ar_model.ARModel):
state_manager = state_management.FilteringOnlyStateManager()
else:
state_manager = state_management.PassthroughStateManager()
if optimizer is None:
optimizer = train.AdamOptimizer(0.02)
self._model = model
ts_regression_head = head_type(
model=model, state_manager=state_manager, optimizer=optimizer,
input_statistics_generator=input_statistics_generator)
model_fn = ts_regression_head.create_estimator_spec
super(TimeSeriesRegressor, self).__init__(
model_fn=model_fn,
model_dir=model_dir,
config=config)
# TODO(allenl): A parsing input receiver function, which takes a serialized
# tf.Example containing all features (times, values, any exogenous features)
# and serialized model state (possibly also as a tf.Example).
def build_raw_serving_input_receiver_fn(self,
default_batch_size=None,
default_series_length=None):
"""Build an input_receiver_fn for export_savedmodel which accepts arrays.
Automatically creates placeholders for exogenous `FeatureColumn`s passed to
the model.
Args:
default_batch_size: If specified, must be a scalar integer. Sets the batch
size in the static shape information of all feature Tensors, which means
only this batch size will be accepted by the exported model. If None
(default), static shape information for batch sizes is omitted.
default_series_length: If specified, must be a scalar integer. Sets the
series length in the static shape information of all feature Tensors,
which means only this series length will be accepted by the exported
model. If None (default), static shape information for series length is
omitted.
Returns:
An input_receiver_fn which may be passed to the Estimator's
export_savedmodel.
"""
def _serving_input_receiver_fn():
"""A receiver function to be passed to export_savedmodel."""
placeholders = {}
time_placeholder = array_ops.placeholder(
name=feature_keys.TrainEvalFeatures.TIMES,
dtype=dtypes.int64,
shape=[default_batch_size, default_series_length])
placeholders[feature_keys.TrainEvalFeatures.TIMES] = time_placeholder
# Values are only necessary when filtering. For prediction the default
# value will be ignored.
placeholders[feature_keys.TrainEvalFeatures.VALUES] = (
array_ops.placeholder_with_default(
name=feature_keys.TrainEvalFeatures.VALUES,
input=array_ops.zeros(
shape=[
default_batch_size
if default_batch_size else 0, default_series_length
if default_series_length else 0, self._model.num_features
],
dtype=self._model.dtype),
shape=(default_batch_size, default_series_length,
self._model.num_features)))
if self._model.exogenous_feature_columns:
with ops.Graph().as_default():
# Default placeholders have only an unknown batch dimension. Make them
# in a separate graph, then splice in the series length to the shapes
# and re-create them in the outer graph.
parsed_features = (
feature_column.make_parse_example_spec(
self._model.exogenous_feature_columns))
placeholder_features = parsing_ops.parse_example(
serialized=array_ops.placeholder(
shape=[None], dtype=dtypes.string),
features=parsed_features)
exogenous_feature_shapes = {
key: (value.get_shape(), value.dtype) for key, value
in placeholder_features.items()}
for feature_key, (batch_only_feature_shape, value_dtype) in (
exogenous_feature_shapes.items()):
batch_only_feature_shape = (
batch_only_feature_shape.with_rank_at_least(1).as_list())
feature_shape = ([default_batch_size, default_series_length]
+ batch_only_feature_shape[1:])
placeholders[feature_key] = array_ops.placeholder(
dtype=value_dtype, name=feature_key, shape=feature_shape)
# Models may not know the shape of their state without creating some
# variables/ops. Avoid polluting the default graph by making a new one. We
# use only static metadata from the returned Tensors.
with ops.Graph().as_default():
self._model.initialize_graph()
# Evaluate the initial state as same-dtype "zero" values. These zero
# constants aren't used, but are necessary for feeding to
# placeholder_with_default for the "cold start" case where state is not
# fed to the model.
def _zeros_like_constant(tensor):
return tensor_util.constant_value(array_ops.zeros_like(tensor))
start_state = nest.map_structure(
_zeros_like_constant, self._model.get_start_state())
batch_size_tensor = array_ops.shape(time_placeholder)[0]
for prefixed_state_name, state in ts_head_lib.state_to_dictionary(
start_state).items():
state_shape_with_batch = tensor_shape.TensorShape(
(default_batch_size,)).concatenate(state.shape)
default_state_broadcast = array_ops.tile(
state[None, ...],
multiples=array_ops.concat(
[batch_size_tensor[None],
array_ops.ones(len(state.shape), dtype=dtypes.int32)],
axis=0))
placeholders[prefixed_state_name] = array_ops.placeholder_with_default(
input=default_state_broadcast,
name=prefixed_state_name,
shape=state_shape_with_batch)
return export_lib.ServingInputReceiver(placeholders, placeholders)
return _serving_input_receiver_fn
class ARRegressor(TimeSeriesRegressor):
"""An Estimator for an (optionally non-linear) autoregressive model.
ARRegressor is a window-based model, inputting fixed windows of length
`input_window_size` and outputting fixed windows of length
`output_window_size`. These two parameters must add up to the window_size
passed to the `Chunker` used to create an `input_fn` for training or
evaluation. `RandomWindowInputFn` is suggested for both training and
evaluation, although it may be seeded for deterministic evaluation.
"""
def __init__(
self, periodicities, input_window_size, output_window_size,
num_features, exogenous_feature_columns=None, num_time_buckets=10,
loss=ar_model.ARModel.NORMAL_LIKELIHOOD_LOSS, hidden_layer_sizes=None,
anomaly_prior_probability=None, anomaly_distribution=None,
optimizer=None, model_dir=None, config=None):
"""Initialize the Estimator.
Args:
periodicities: periodicities of the input data, in the same units as the
time feature. Note this can be a single value or a list of values for
multiple periodicities.
input_window_size: Number of past time steps of data to look at when doing
the regression.
output_window_size: Number of future time steps to predict. Note that
setting it to > 1 empirically seems to give a better fit.
num_features: The dimensionality of the time series (one for univariate,
more than one for multivariate).
exogenous_feature_columns: A list of `tf.feature_column`s (for example
`tf.feature_column.embedding_column`) corresponding to exogenous
features which provide extra information to the model but are not part
of the series to be predicted. Passed to
`tf.feature_column.input_layer`.
num_time_buckets: Number of buckets into which to divide (time %
periodicity) for generating time based features.
loss: Loss function to use for training. Currently supported values are
SQUARED_LOSS and NORMAL_LIKELIHOOD_LOSS. Note that for
NORMAL_LIKELIHOOD_LOSS, we train the covariance term as well. For
SQUARED_LOSS, the evaluation loss is reported based on un-scaled
observations and predictions, while the training loss is computed on
normalized data.
hidden_layer_sizes: list of sizes of hidden layers.
anomaly_prior_probability: If specified, constructs a mixture model under
which anomalies (modeled with `anomaly_distribution`) have this prior
probability. See `AnomalyMixtureARModel`.
anomaly_distribution: May not be specified unless
anomaly_prior_probability is specified and is not None. Controls the
distribution of anomalies under the mixture model. Currently either
`ar_model.AnomalyMixtureARModel.GAUSSIAN_ANOMALY` or
`ar_model.AnomalyMixtureARModel.CAUCHY_ANOMALY`. See
`AnomalyMixtureARModel`. Defaults to `GAUSSIAN_ANOMALY`.
optimizer: The optimization algorithm to use when training, inheriting
from tf.train.Optimizer. Defaults to Adagrad with step size 0.1.
model_dir: See `Estimator`.
config: See `Estimator`.
Raises:
ValueError: For invalid combinations of arguments.
"""
if optimizer is None:
optimizer = train.AdagradOptimizer(0.1)
if anomaly_prior_probability is None and anomaly_distribution is not None:
raise ValueError("anomaly_prior_probability is required if "
"anomaly_distribution is specified.")
if anomaly_prior_probability is None:
if anomaly_distribution is None:
anomaly_distribution = ar_model.AnomalyMixtureARModel.GAUSSIAN_ANOMALY
model = ar_model.ARModel(
periodicities=periodicities, num_features=num_features,
prediction_model_factory=functools.partial(
ar_model.FlatPredictionModel,
hidden_layer_sizes=hidden_layer_sizes),
exogenous_feature_columns=exogenous_feature_columns,
num_time_buckets=num_time_buckets,
input_window_size=input_window_size,
output_window_size=output_window_size, loss=loss)
else:
if loss != ar_model.ARModel.NORMAL_LIKELIHOOD_LOSS:
raise ValueError(
"AnomalyMixtureARModel only supports "
"ar_model.ARModel.NORMAL_LIKELIHOOD_LOSS for its loss argument.")
model = ar_model.AnomalyMixtureARModel(
periodicities=periodicities,
input_window_size=input_window_size,
output_window_size=output_window_size,
num_features=num_features,
prediction_model_factory=functools.partial(
ar_model.FlatPredictionModel,
hidden_layer_sizes=hidden_layer_sizes),
exogenous_feature_columns=exogenous_feature_columns,
num_time_buckets=num_time_buckets,
anomaly_prior_probability=anomaly_prior_probability,
anomaly_distribution=anomaly_distribution)
state_manager = state_management.FilteringOnlyStateManager()
super(ARRegressor, self).__init__(
model=model,
state_manager=state_manager,
optimizer=optimizer,
model_dir=model_dir,
config=config)
class StateSpaceRegressor(TimeSeriesRegressor):
"""An Estimator for general state space models."""
def __init__(self, model, state_manager=None, optimizer=None, model_dir=None,
config=None, head_type=ts_head_lib.TimeSeriesRegressionHead):
"""See TimeSeriesRegressor. Uses the ChainingStateManager by default."""
if not isinstance(model, state_space_model.StateSpaceModel):
raise ValueError(
"StateSpaceRegressor only supports state space models (children of "
"StateSpaceModel) in its `model` argument, got {}.".format(model))
if state_manager is None:
state_manager = state_management.ChainingStateManager()
super(StateSpaceRegressor, self).__init__(
model=model,
state_manager=state_manager,
optimizer=optimizer,
model_dir=model_dir,
config=config,
head_type=head_type)
class StructuralEnsembleRegressor(StateSpaceRegressor):
"""An Estimator for structural time series models.
"Structural" refers to the fact that this model explicitly accounts for
structure in the data, such as periodicity and trends.
`StructuralEnsembleRegressor` is a state space model. It contains components
for modeling level, local linear trends, periodicity, and mean-reverting
transients via a moving average component. Multivariate series are fit with
full covariance matrices for observation and latent state transition noise,
each feature of the multivariate series having its own latent components.
Note that unlike `ARRegressor`, `StructuralEnsembleRegressor` is sequential,
and so accepts variable window sizes with the same model.
For training, `RandomWindowInputFn` is recommended as an `input_fn`. Model
state is managed through `ChainingStateManager`: since state space models are
inherently sequential, we save state from previous iterations to get
approximate/eventual consistency while achieving good performance through
batched computation.
For evaluation, either pass a significant chunk of the series in a single
window (e.g. set `window_size` to the whole series with
`WholeDatasetInputFn`), or use enough random evaluation iterations to cover
several passes through the whole dataset. Either method will ensure that stale
saved state has been flushed.
"""
def __init__(self,
periodicities,
num_features,
cycle_num_latent_values=11,
moving_average_order=4,
autoregressive_order=0,
exogenous_feature_columns=None,
exogenous_update_condition=None,
dtype=dtypes.float64,
anomaly_prior_probability=None,
optimizer=None,
model_dir=None,
config=None,
head_type=ts_head_lib.TimeSeriesRegressionHead):
"""Initialize the Estimator.
Args:
periodicities: The expected periodicity of the data (for example 24 if
feeding hourly data with a daily periodicity, or 60 * 24 if feeding
minute-level data with daily periodicity). Either a scalar or a
list. This parameter can be any real value, and does not control the
size of the model. However, increasing this without increasing
`num_values_per_cycle` will lead to smoother periodic behavior, as the
same number of distinct values will be cycled through over a longer
period of time.
num_features: The dimensionality of the time series (one for univariate,
more than one for multivariate).
cycle_num_latent_values: Along with `moving_average_order` and
`num_features`, controls the latent state size of the model. Square
matrices of size `num_features * (moving_average_order +
cycle_num_latent_values + 3)` are created and multiplied, so larger
values may be slow. The trade-off is with resolution: cycling between
a smaller number of latent values means that only smoother functions
can be modeled.
moving_average_order: Controls model size (along with
`cycle_num_latent_values` and `autoregressive_order`) and the number
of steps before transient deviations revert to the mean defined by the
period and level/trend components.
autoregressive_order: Each contribution from this component is a linear
combination of this many previous contributions. Also helps to
determine the model size. Learning autoregressive coefficients
typically requires more steps and a smaller step size than other
components.
exogenous_feature_columns: A list of `tf.feature_column`s (for example
`tf.feature_column.embedding_column`) corresponding to exogenous
features which provide extra information to the model but are not part
of the series to be predicted. Passed to
`tf.feature_column.input_layer`.
exogenous_update_condition: A function taking two Tensor arguments,
`times` (shape [batch size]) and `features` (a dictionary mapping
exogenous feature keys to Tensors with shapes [batch size, ...]), and
returning a boolean Tensor with shape [batch size] indicating whether
state should be updated using exogenous features for each part of the
batch. Where it is False, no exogenous update is performed. If None
(default), exogenous updates are always performed. Useful for avoiding
"leaky" frequent exogenous updates when sparse updates are
desired. Called only during graph construction. See the "known
anomaly" example for example usage.
dtype: The floating point data type to compute with. float32 may be
faster, but can be problematic for larger models and longer time series.
anomaly_prior_probability: If not None, the model attempts to
automatically detect and ignore anomalies during training. This
parameter then controls the prior probability of an anomaly. Values
closer to 0 mean that points will be discarded less frequently. The
default value (None) means that anomalies are not discarded, which may
be slightly faster.
optimizer: The optimization algorithm to use when training, inheriting
from tf.train.Optimizer. Defaults to Adam with step size 0.02.
model_dir: See `Estimator`.
config: See `Estimator`.
head_type: The kind of head to use for the model (inheriting from
`TimeSeriesRegressionHead`).
"""
if anomaly_prior_probability is not None:
filtering_postprocessor = StateInterpolatingAnomalyDetector(
anomaly_prior_probability=anomaly_prior_probability)
else:
filtering_postprocessor = None
state_space_model_configuration = (
state_space_model.StateSpaceModelConfiguration(
num_features=num_features,
dtype=dtype,
filtering_postprocessor=filtering_postprocessor,
exogenous_feature_columns=exogenous_feature_columns,
exogenous_update_condition=exogenous_update_condition))
model = structural_ensemble.MultiResolutionStructuralEnsemble(
cycle_num_latent_values=cycle_num_latent_values,
moving_average_order=moving_average_order,
autoregressive_order=autoregressive_order,
periodicities=periodicities,
configuration=state_space_model_configuration)
super(StructuralEnsembleRegressor, self).__init__(
model=model,
optimizer=optimizer,
model_dir=model_dir,
config=config,
head_type=head_type)
|