aboutsummaryrefslogtreecommitdiffhomepage
path: root/tensorflow/core/lib/io/snappy/snappy_outputbuffer.cc
blob: be1fa22c69c27a5c57e3c397076a66dfe05eb035 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#include "tensorflow/core/lib/io/snappy/snappy_outputbuffer.h"

namespace tensorflow {
namespace io {

SnappyOutputBuffer::SnappyOutputBuffer(WritableFile* file,
                                       int32 input_buffer_bytes,
                                       int32 output_buffer_bytes)
    : file_(file),
      input_buffer_(new char[input_buffer_bytes]),
      input_buffer_capacity_(input_buffer_bytes),
      next_in_(input_buffer_.get()),
      output_buffer_(new char[output_buffer_bytes]),
      output_buffer_capacity_(output_buffer_bytes),
      next_out_(output_buffer_.get()),
      avail_out_(output_buffer_bytes) {}

Status SnappyOutputBuffer::Write(StringPiece data) {
  //
  // The deflated output is accumulated in output_buffer_ and gets written to
  // file as and when needed.

  size_t bytes_to_write = data.size();

  // If there is sufficient free space in input_buffer_ to fit data we
  // add it there and return.
  if (bytes_to_write <= AvailableInputSpace()) {
    AddToInputBuffer(data);
    return Status::OK();
  }

  // If there isn't enough available space in the input_buffer_ we empty it
  // by uncompressing its contents. If data now fits in input_buffer_
  // we add it there else we directly deflate it.
  TF_RETURN_IF_ERROR(DeflateBuffered());

  // input_buffer_ should be empty at this point.
  if (bytes_to_write <= AvailableInputSpace()) {
    AddToInputBuffer(data);
    return Status::OK();
  }

  // `data` is too large to fit in input buffer so we deflate it directly.
  // Note that at this point we have already deflated all existing input so
  // we do not need to backup next_in and avail_in.
  next_in_ = const_cast<char*>(data.data());
  avail_in_ = bytes_to_write;

  TF_RETURN_IF_ERROR(Deflate());

  DCHECK(avail_in_ == 0);  // All input will be used up.

  next_in_ = input_buffer_.get();

  return Status::OK();
}

Status SnappyOutputBuffer::Flush() {
  TF_RETURN_IF_ERROR(DeflateBuffered());
  TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
  return Status::OK();
}

int32 SnappyOutputBuffer::AvailableInputSpace() const {
  return input_buffer_capacity_ - avail_in_;
}

void SnappyOutputBuffer::AddToInputBuffer(StringPiece data) {
  size_t bytes_to_write = data.size();
  DCHECK_LE(bytes_to_write, AvailableInputSpace());

  // Input stream ->
  // [....................input_buffer_capacity_...............]
  // [<...read_bytes...><...avail_in...>......empty space......]
  //  ^                 ^
  //  |                 |
  //  input_buffer_   next_in
  //
  // Data in the input stream is sharded as shown above. next_in_ could
  // be pointing to some byte in the buffer with avail_in number of bytes
  // available to be read.
  //
  // In order to avoid shifting the avail_in bytes at next_in to the head of
  // the buffer we try to fit `data` in the empty space at the tail of the
  // input stream.
  // TODO(srbs): This could be avoided if we had a circular buffer.
  // If it doesn't fit we free the space at the head of the stream and then
  // append `data` at the end of existing data.

  const int32 read_bytes = next_in_ - input_buffer_.get();
  const int32 unread_bytes = avail_in_;
  const int32 free_tail_bytes =
      input_buffer_capacity_ - (read_bytes + unread_bytes);

  if (bytes_to_write > free_tail_bytes) {
    memmove(input_buffer_.get(), next_in_, avail_in_);
    next_in_ = input_buffer_.get();
  }
  memcpy(next_in_ + avail_in_, data.data(), bytes_to_write);
  avail_in_ += bytes_to_write;
}

Status SnappyOutputBuffer::AddToOutputBuffer(const char* data, size_t length) {
  while (length > 0) {
    size_t bytes_to_copy = std::min(length, avail_out_);
    memcpy(next_out_, data, bytes_to_copy);
    data += bytes_to_copy;
    next_out_ += bytes_to_copy;
    avail_out_ -= bytes_to_copy;
    length -= bytes_to_copy;
    if (avail_out_ == 0) {
      TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    }
  }
  return Status::OK();
}

Status SnappyOutputBuffer::DeflateBuffered() {
  TF_RETURN_IF_ERROR(Deflate());
  DCHECK(avail_in_ == 0);
  next_in_ = input_buffer_.get();
  return Status::OK();
}

Status SnappyOutputBuffer::FlushOutputBufferToFile() {
  size_t bytes_to_write = output_buffer_capacity_ - avail_out_;
  if (bytes_to_write > 0) {
    Status s = file_->Append(StringPiece(
        reinterpret_cast<char*>(output_buffer_.get()), bytes_to_write));
    if (s.ok()) {
      next_out_ = output_buffer_.get();
      avail_out_ = output_buffer_capacity_;
    }
    return s;
  }
  return Status::OK();
}

Status SnappyOutputBuffer::Deflate() {
  if (avail_in_ == 0) {
    return Status::OK();
  }
  string output;
  if (!port::Snappy_Compress(next_in_, avail_in_, &output)) {
    return errors::DataLoss("Snappy_Compress failed");
  }

  // Write length of compressed block to output buffer.
  char* compressed_length_array = new char[4];
  std::fill(compressed_length_array, compressed_length_array + 4, 0);
  for (int i = 0; i < 4; i++) {
    // Little endian.
    compressed_length_array[i] = output.size() >> (8 * (3 - i));
  }
  TF_RETURN_IF_ERROR(AddToOutputBuffer(compressed_length_array, 4));

  // Write compressed output to buffer.
  TF_RETURN_IF_ERROR(AddToOutputBuffer(output.data(), output.size()));
  next_in_ += avail_in_;
  avail_in_ = 0;
  delete[] compressed_length_array;

  return Status::OK();
}

}  // namespace io
}  // namespace tensorflow