OLD | NEW |
(Empty) | |
| 1 /* |
| 2 * |
| 3 * Copyright 2015, Google Inc. |
| 4 * All rights reserved. |
| 5 * |
| 6 * Redistribution and use in source and binary forms, with or without |
| 7 * modification, are permitted provided that the following conditions are |
| 8 * met: |
| 9 * |
| 10 * * Redistributions of source code must retain the above copyright |
| 11 * notice, this list of conditions and the following disclaimer. |
| 12 * * Redistributions in binary form must reproduce the above |
| 13 * copyright notice, this list of conditions and the following disclaimer |
| 14 * in the documentation and/or other materials provided with the |
| 15 * distribution. |
| 16 * * Neither the name of Google Inc. nor the names of its |
| 17 * contributors may be used to endorse or promote products derived from |
| 18 * this software without specific prior written permission. |
| 19 * |
| 20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 21 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 22 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
| 23 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
| 24 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
| 25 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
| 26 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
| 27 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
| 28 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
| 29 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
| 30 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| 31 * |
| 32 */ |
| 33 |
| 34 #include <grpc++/impl/proto_utils.h> |
| 35 |
| 36 #include <climits> |
| 37 |
| 38 #include <grpc/grpc.h> |
| 39 #include <grpc/byte_buffer.h> |
| 40 #include <grpc/byte_buffer_reader.h> |
| 41 #include <grpc/support/log.h> |
| 42 #include <grpc/support/slice.h> |
| 43 #include <grpc/support/slice_buffer.h> |
| 44 #include <grpc/support/port_platform.h> |
| 45 #include <grpc++/support/config.h> |
| 46 |
| 47 #include "src/core/profiling/timers.h" |
| 48 |
| 49 const int kMaxBufferLength = 8192; |
| 50 |
| 51 class GrpcBufferWriter GRPC_FINAL |
| 52 : public ::grpc::protobuf::io::ZeroCopyOutputStream { |
| 53 public: |
| 54 explicit GrpcBufferWriter(grpc_byte_buffer** bp, |
| 55 int block_size = kMaxBufferLength) |
| 56 : block_size_(block_size), byte_count_(0), have_backup_(false) { |
| 57 *bp = grpc_raw_byte_buffer_create(NULL, 0); |
| 58 slice_buffer_ = &(*bp)->data.raw.slice_buffer; |
| 59 } |
| 60 |
| 61 ~GrpcBufferWriter() GRPC_OVERRIDE { |
| 62 if (have_backup_) { |
| 63 gpr_slice_unref(backup_slice_); |
| 64 } |
| 65 } |
| 66 |
| 67 bool Next(void** data, int* size) GRPC_OVERRIDE { |
| 68 if (have_backup_) { |
| 69 slice_ = backup_slice_; |
| 70 have_backup_ = false; |
| 71 } else { |
| 72 slice_ = gpr_slice_malloc(block_size_); |
| 73 } |
| 74 *data = GPR_SLICE_START_PTR(slice_); |
| 75 // On win x64, int is only 32bit |
| 76 GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX); |
| 77 byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_); |
| 78 gpr_slice_buffer_add(slice_buffer_, slice_); |
| 79 return true; |
| 80 } |
| 81 |
| 82 void BackUp(int count) GRPC_OVERRIDE { |
| 83 gpr_slice_buffer_pop(slice_buffer_); |
| 84 if (count == block_size_) { |
| 85 backup_slice_ = slice_; |
| 86 } else { |
| 87 backup_slice_ = |
| 88 gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count); |
| 89 gpr_slice_buffer_add(slice_buffer_, slice_); |
| 90 } |
| 91 have_backup_ = true; |
| 92 byte_count_ -= count; |
| 93 } |
| 94 |
| 95 grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; } |
| 96 |
| 97 private: |
| 98 const int block_size_; |
| 99 int64_t byte_count_; |
| 100 gpr_slice_buffer* slice_buffer_; |
| 101 bool have_backup_; |
| 102 gpr_slice backup_slice_; |
| 103 gpr_slice slice_; |
| 104 }; |
| 105 |
| 106 class GrpcBufferReader GRPC_FINAL |
| 107 : public ::grpc::protobuf::io::ZeroCopyInputStream { |
| 108 public: |
| 109 explicit GrpcBufferReader(grpc_byte_buffer* buffer) |
| 110 : byte_count_(0), backup_count_(0) { |
| 111 grpc_byte_buffer_reader_init(&reader_, buffer); |
| 112 } |
| 113 ~GrpcBufferReader() GRPC_OVERRIDE { |
| 114 grpc_byte_buffer_reader_destroy(&reader_); |
| 115 } |
| 116 |
| 117 bool Next(const void** data, int* size) GRPC_OVERRIDE { |
| 118 if (backup_count_ > 0) { |
| 119 *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) - |
| 120 backup_count_; |
| 121 GPR_ASSERT(backup_count_ <= INT_MAX); |
| 122 *size = (int)backup_count_; |
| 123 backup_count_ = 0; |
| 124 return true; |
| 125 } |
| 126 if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) { |
| 127 return false; |
| 128 } |
| 129 gpr_slice_unref(slice_); |
| 130 *data = GPR_SLICE_START_PTR(slice_); |
| 131 // On win x64, int is only 32bit |
| 132 GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX); |
| 133 byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_); |
| 134 return true; |
| 135 } |
| 136 |
| 137 void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; } |
| 138 |
| 139 bool Skip(int count) GRPC_OVERRIDE { |
| 140 const void* data; |
| 141 int size; |
| 142 while (Next(&data, &size)) { |
| 143 if (size >= count) { |
| 144 BackUp(size - count); |
| 145 return true; |
| 146 } |
| 147 // size < count; |
| 148 count -= size; |
| 149 } |
| 150 // error or we have too large count; |
| 151 return false; |
| 152 } |
| 153 |
| 154 grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { |
| 155 return byte_count_ - backup_count_; |
| 156 } |
| 157 |
| 158 private: |
| 159 int64_t byte_count_; |
| 160 int64_t backup_count_; |
| 161 grpc_byte_buffer_reader reader_; |
| 162 gpr_slice slice_; |
| 163 }; |
| 164 |
| 165 namespace grpc { |
| 166 |
| 167 Status SerializeProto(const grpc::protobuf::Message& msg, |
| 168 grpc_byte_buffer** bp) { |
| 169 GPR_TIMER_SCOPE("SerializeProto", 0); |
| 170 int byte_size = msg.ByteSize(); |
| 171 if (byte_size <= kMaxBufferLength) { |
| 172 gpr_slice slice = gpr_slice_malloc(byte_size); |
| 173 GPR_ASSERT(GPR_SLICE_END_PTR(slice) == |
| 174 msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice))); |
| 175 *bp = grpc_raw_byte_buffer_create(&slice, 1); |
| 176 gpr_slice_unref(slice); |
| 177 return Status::OK; |
| 178 } else { |
| 179 GrpcBufferWriter writer(bp); |
| 180 return msg.SerializeToZeroCopyStream(&writer) |
| 181 ? Status::OK |
| 182 : Status(StatusCode::INTERNAL, "Failed to serialize message"); |
| 183 } |
| 184 } |
| 185 |
| 186 Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg, |
| 187 int max_message_size) { |
| 188 GPR_TIMER_SCOPE("DeserializeProto", 0); |
| 189 if (!buffer) { |
| 190 return Status(StatusCode::INTERNAL, "No payload"); |
| 191 } |
| 192 GrpcBufferReader reader(buffer); |
| 193 ::grpc::protobuf::io::CodedInputStream decoder(&reader); |
| 194 if (max_message_size > 0) { |
| 195 decoder.SetTotalBytesLimit(max_message_size, max_message_size); |
| 196 } |
| 197 if (!msg->ParseFromCodedStream(&decoder)) { |
| 198 return Status(StatusCode::INTERNAL, msg->InitializationErrorString()); |
| 199 } |
| 200 if (!decoder.ConsumedEntireMessage()) { |
| 201 return Status(StatusCode::INTERNAL, "Did not read entire message"); |
| 202 } |
| 203 return Status::OK; |
| 204 } |
| 205 |
| 206 } // namespace grpc |
OLD | NEW |