aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/lib/io/zlib_outputbuffer.cc
blob: 84b47c171f23c28378d664d39b1892f68d241c96 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include "tensorflow/core/lib/io/zlib_outputbuffer.h"

#include "tensorflow/core/lib/core/errors.h"

namespace tensorflow {
namespace io {

ZlibOutputBuffer::ZlibOutputBuffer(
    WritableFile* file,
    int32 input_buffer_bytes,  // size of z_stream.next_in buffer
    int32 output_buffer_bytes,
    const ZlibCompressionOptions&
        zlib_options)  // size of z_stream.next_out buffer
    : file_(file),
      init_status_(),
      input_buffer_capacity_(input_buffer_bytes),
      output_buffer_capacity_(output_buffer_bytes),
      z_stream_input_(new Bytef[input_buffer_bytes]),
      z_stream_output_(new Bytef[output_buffer_bytes]),
      zlib_options_(zlib_options),
      z_stream_(new z_stream) {}

ZlibOutputBuffer::~ZlibOutputBuffer() {
  if (z_stream_) {
    LOG(WARNING) << "ZlibOutputBuffer::Close() not called. Possible data loss";
  }
}

Status ZlibOutputBuffer::Init() {
  // Output buffer size should be greater than 1 because deflation needs atleast
  // one byte for book keeping etc.
  if (output_buffer_capacity_ <= 1) {
    return errors::InvalidArgument(
        "output_buffer_bytes should be greater than "
        "1");
  }
  memset(z_stream_.get(), 0, sizeof(z_stream));
  z_stream_->zalloc = Z_NULL;
  z_stream_->zfree = Z_NULL;
  z_stream_->opaque = Z_NULL;
  int status =
      deflateInit2(z_stream_.get(), zlib_options_.compression_level,
                   zlib_options_.compression_method, zlib_options_.window_bits,
                   zlib_options_.mem_level, zlib_options_.compression_strategy);
  if (status != Z_OK) {
    z_stream_.reset(nullptr);
    return errors::InvalidArgument("deflateInit failed with status", status);
  }
  z_stream_->next_in = z_stream_input_.get();
  z_stream_->next_out = z_stream_output_.get();
  z_stream_->avail_in = 0;
  z_stream_->avail_out = output_buffer_capacity_;
  return Status::OK();
}

int32 ZlibOutputBuffer::AvailableInputSpace() const {
  return input_buffer_capacity_ - z_stream_->avail_in;
}

void ZlibOutputBuffer::AddToInputBuffer(StringPiece data) {
  size_t bytes_to_write = data.size();
  CHECK_LE(bytes_to_write, AvailableInputSpace());

  // Input stream ->
  // [....................input_buffer_capacity_...............]
  // [<...read_bytes...><...avail_in...>......empty space......]
  //  ^                 ^
  //  |                 |
  //  z_stream_input_   next_in
  //
  // Data in the input stream is sharded as show above. z_stream_->next_in could
  // be pointing to some byte in the buffer with avail_in number of bytes
  // available to be read.
  //
  // In order to avoid shifting the avail_in bytes at next_in to the head of
  // the buffer we try to fit `data` in the empty space at the tail of the
  // input stream.
  // TODO(srbs): This could be avoided if we had a circular buffer.
  // If it doesn't fit we free the space at the head of the stream and then
  // append `data` at the end of existing data.

  int32 read_bytes = z_stream_->next_in - z_stream_input_.get();
  int32 unread_bytes = z_stream_->avail_in;
  int32 free_tail_bytes = input_buffer_capacity_ - (read_bytes + unread_bytes);

  if (bytes_to_write > free_tail_bytes) {
    memmove(z_stream_input_.get(), z_stream_->next_in, z_stream_->avail_in);
    z_stream_->next_in = z_stream_input_.get();
  }
  memcpy(z_stream_->next_in + z_stream_->avail_in, data.data(), bytes_to_write);
  z_stream_->avail_in += bytes_to_write;
}

Status ZlibOutputBuffer::DeflateBuffered(bool last) {
  int flush_mode = last ? Z_FINISH : zlib_options_.flush_mode;
  do {
    // From zlib manual (http://www.zlib.net/manual.html):
    //
    // "In the case of a Z_FULL_FLUSH or Z_SYNC_FLUSH, make sure that
    // avail_out is greater than six to avoid repeated flush markers due
    // to avail_out == 0 on return."
    //
    // If above condition is met or if output buffer is full we flush contents
    // to file.
    if (z_stream_->avail_out == 0 ||
        (IsSyncOrFullFlush(flush_mode) && z_stream_->avail_out < 6)) {
      TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    }
    TF_RETURN_IF_ERROR(Deflate(flush_mode));
  } while (z_stream_->avail_out == 0);

  DCHECK(z_stream_->avail_in == 0);
  z_stream_->next_in = z_stream_input_.get();
  return Status::OK();
}

Status ZlibOutputBuffer::FlushOutputBufferToFile() {
  uint32 bytes_to_write = output_buffer_capacity_ - z_stream_->avail_out;
  if (bytes_to_write > 0) {
    Status s = file_->Append(StringPiece(
        reinterpret_cast<char*>(z_stream_output_.get()), bytes_to_write));
    if (s.ok()) {
      z_stream_->next_out = z_stream_output_.get();
      z_stream_->avail_out = output_buffer_capacity_;
    }
    return s;
  }
  return Status::OK();
}

Status ZlibOutputBuffer::Append(const StringPiece& data) {
  // If there is sufficient free space in z_stream_input_ to fit data we
  // add it there and return.
  // If there isn't enough space we deflate the existing contents of
  // z_input_stream_. If data now fits in z_input_stream_ we add it there
  // else we directly deflate it.
  //
  // The deflated output is accumulated in z_stream_output_ and gets written to
  // file as and when needed.

  size_t bytes_to_write = data.size();

  if (bytes_to_write <= AvailableInputSpace()) {
    AddToInputBuffer(data);
    return Status::OK();
  }

  TF_RETURN_IF_ERROR(DeflateBuffered());

  // At this point input stream should be empty.
  if (bytes_to_write <= AvailableInputSpace()) {
    AddToInputBuffer(data);
    return Status::OK();
  }

  // `data` is too large to fit in input buffer so we deflate it directly.
  // Note that at this point we have already deflated all existing input so
  // we do not need to backup next_in and avail_in.
  z_stream_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(data.data()));
  z_stream_->avail_in = bytes_to_write;

  do {
    if (z_stream_->avail_out == 0) {
      // No available output space.
      // Write output buffer to file.
      TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    }
    TF_RETURN_IF_ERROR(Deflate(zlib_options_.flush_mode));
  } while (z_stream_->avail_out == 0);

  DCHECK(z_stream_->avail_in == 0);  // All input will be used up.

  // Restore z_stream input pointers.
  z_stream_->next_in = z_stream_input_.get();

  return Status::OK();
}

Status ZlibOutputBuffer::Flush() {
  TF_RETURN_IF_ERROR(DeflateBuffered());
  TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
  return Status::OK();
}

Status ZlibOutputBuffer::Sync() {
  TF_RETURN_IF_ERROR(Flush());
  return file_->Sync();
}

Status ZlibOutputBuffer::Close() {
  if (z_stream_) {
    TF_RETURN_IF_ERROR(DeflateBuffered(true));
    TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    deflateEnd(z_stream_.get());
    z_stream_.reset(nullptr);
  }
  return Status::OK();
}

Status ZlibOutputBuffer::Deflate(int flush) {
  int error = deflate(z_stream_.get(), flush);
  if (error == Z_OK || error == Z_BUF_ERROR ||
      (error == Z_STREAM_END && flush == Z_FINISH)) {
    return Status::OK();
  }
  string error_string = strings::StrCat("deflate() failed with error ", error);
  if (z_stream_->msg != nullptr) {
    strings::StrAppend(&error_string, ": ", z_stream_->msg);
  }
  return errors::DataLoss(error_string);
}

}  // namespace io
}  // namespace tensorflow