Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1370)

Side by Side Diff: content/browser/streams/stream.cc

Issue 12335087: Implement the Stream registry in content (Closed) Base URL: http://git.chromium.org/chromium/src.git@master
Patch Set: Code review fixes Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/streams/stream.h"
6
7 #include "base/bind.h"
8 #include "base/message_loop_proxy.h"
9 #include "content/browser/streams/stream_read_observer.h"
10 #include "content/browser/streams/stream_registry.h"
11 #include "content/browser/streams/stream_write_observer.h"
12 #include "net/base/io_buffer.h"
13
14 namespace {
15 // Start throttling the connection at about 1MB
16 const size_t kDeferSizeThreshold = 40 * 32768;
17 }
18
19 namespace content {
20
21 Stream::Stream(StreamRegistry* registry, const GURL& security_origin,
22 const GURL& url)
23 : bytes_read_(0),
24 can_add_data_(true),
25 security_origin_(security_origin),
26 stream_url_(url),
27 data_length_(0),
28 registry_(registry) {
29 CreateByteStream(base::MessageLoopProxy::current(),
30 base::MessageLoopProxy::current(),
31 kDeferSizeThreshold,
32 &writer_,
33 &reader_);
34
35 // Setup callback for writing.
36 writer_->RegisterCallback(base::Bind(&Stream::OnSpaceAvailable, this));
37 reader_->RegisterCallback(base::Bind(&Stream::OnDataAvailable, this));
38
39 registry_->RegisterStream(this);
40 }
41
42 Stream::~Stream() {
43 }
44
45 void Stream::AddReadObserver(StreamReadObserver* observer) {
46 read_observers_.AddObserver(observer);
47 }
48
49 void Stream::RemoveReadObserver(StreamReadObserver* observer) {
50 read_observers_.RemoveObserver(observer);
51 }
52
53 void Stream::AddWriteObserver(StreamWriteObserver* observer) {
54 write_observers_.AddObserver(observer);
55 }
56
57 void Stream::RemoveWriteObserver(StreamWriteObserver* observer) {
58 write_observers_.RemoveObserver(observer);
59 }
60
61 void Stream::AddData(scoped_refptr<net::IOBuffer> buffer, size_t size) {
62 can_add_data_ = writer_->Write(buffer, size);
63 }
64
65 void Stream::Finalize() {
66 writer_->Close(DOWNLOAD_INTERRUPT_REASON_NONE);
67 writer_.reset(NULL);
68
69 OnDataAvailable();
70 }
71
72 bool Stream::ReadRawData(net::IOBuffer* buf, int buf_size, int* bytes_read) {
73 if (!data_) {
74 data_length_ = 0;
75 bytes_read_ = 0;
76 ByteStreamReader::StreamState state = reader_->Read(&data_, &data_length_);
77 switch (state) {
78 case ByteStreamReader::STREAM_HAS_DATA:
79 break;
80 case ByteStreamReader::STREAM_COMPLETE:
81 registry_->OnStreamConsumed(this);
82 return true;
83 case ByteStreamReader::STREAM_EMPTY:
84 return false;
85 }
86 }
87
88 size_t remaining_bytes = data_length_ - bytes_read_;
89 size_t to_read =
90 static_cast<size_t>(buf_size) < remaining_bytes ?
91 buf_size : remaining_bytes;
92 memcpy(buf->data(), data_->data() + bytes_read_, to_read);
93 bytes_read_ += to_read;
94 if (bytes_read_ >= data_length_)
95 data_ = NULL;
96
97 *bytes_read = to_read;
98 return true;
99 }
100
101 void Stream::OnSpaceAvailable() {
102 can_add_data_ = true;
103 FOR_EACH_OBSERVER(StreamWriteObserver,
104 write_observers_,
105 OnSpaceAvailable(this));
106 }
107
108 void Stream::OnDataAvailable() {
109 FOR_EACH_OBSERVER(StreamReadObserver,
110 read_observers_,
111 OnDataAvailable(this));
112 }
113
114 } // namespace content
115
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698