Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Side by Side Diff: chrome/browser/chromeos/web_socket_proxy.cc

Issue 6801008: Websocket to TCP proxy running in a separate thread (only on ChromeOS). (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: rebase+minor Created 9 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "web_socket_proxy.h"
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <string.h>
10
11 #include <algorithm>
12 #include <limits>
13 #include <list>
14 #include <map>
15 #include <vector>
16
17 #include <arpa/inet.h>
18 #include <errno.h>
19 #include <fcntl.h>
20 #include <netinet/in.h>
21 #include <signal.h>
22 #include <sys/socket.h>
23 #include <sys/types.h>
24 #include <sys/wait.h>
25
26 #include "base/base64.h"
27 #include "base/basictypes.h"
28 #include "base/logging.h"
29 #include "base/md5.h"
30 #include "base/memory/scoped_ptr.h"
31 #include "base/string_number_conversions.h"
32 #include "base/string_util.h"
33 #include "content/browser/browser_thread.h"
34 #include "content/common/notification_service.h"
35 #include "content/common/notification_type.h"
36 // TODO(dilmah): enable this once webSocketProxyPrivate.getToken is wired.
37 #if 0
38 #include "chrome/browser/internal_auth.h"
39 #endif
40 #include "third_party/libevent/evdns.h"
41 #include "third_party/libevent/event.h"
42
43 namespace chromeos {
44
45 namespace {
46
47 const uint8 kCRLF[] = "\r\n";
48 const uint8 kCRLFCRLF[] = "\r\n\r\n";
49
50 // Not a constant but preprocessor definition for easy concatenation.
51 #define kProxyPath "/tcpproxy"
52
53 // Returns true on success.
54 bool SetNonBlock(int fd) {
55 int flags = fcntl(fd, F_GETFL, 0);
56 return flags >= 0 && fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0;
57 }
58
59 // Returns true on success.
60 bool IgnoreSigPipe() {
61 struct sigaction sa;
62 sa.sa_handler = SIG_IGN;
63 sa.sa_flags = 0;
64 if (sigemptyset(&sa.sa_mask) || sigaction(SIGPIPE, &sa, 0)) {
65 LOG(ERROR) << "WebSocketProxy: Failed to disable sigpipe";
66 return false;
67 }
68 return true;
69 }
70
71 int CountSpaces(const std::string& s) {
72 static const uint8 kSpaceOctet = 0x20;
73 int rv = 0;
74 for (size_t i = 0; i < s.size(); ++i)
75 rv += (s[i] == kSpaceOctet);
76 return rv;
77 }
78
79 std::string FetchLowerCasedASCIISnippet(uint8* begin, uint8* end) {
80 std::string rv;
81 for (; begin < end; ++begin) {
82 if (!isascii(*begin))
83 return rv;
84 rv += base::ToLowerASCII(*begin);
85 }
86 return rv;
87 }
88
89 // Returns true on success.
90 bool FetchDecimalDigits(const std::string& s, uint32* result) {
91 *result = 0;
92 bool got_something = false;
93 for (size_t i = 0; i < s.size(); ++i) {
94 if (IsAsciiDigit(s[i])) {
95 got_something = true;
96 if (*result > std::numeric_limits<uint32>::max() / 10)
97 return false;
98 *result *= 10;
99 int digit = s[i] - '0';
100 if (*result > std::numeric_limits<uint32>::max() - digit)
101 return false;
102 *result += digit;
103 }
104 }
105 return got_something;
106 }
107
108 // Parses "token:hostname:port:" string. Returns true on success.
109 bool FetchTokenNamePort(
110 uint8* begin, uint8* end,
111 std::string* token, std::string* name, uint32* port) {
112 std::string input(begin, end);
113 if (input[input.size() - 1] != ':')
114 return false;
115 input.resize(input.size() - 1);
116
117 size_t pos = input.find_last_of(':');
118 if (pos == std::string::npos)
119 return false;
120 std::string port_str(input, pos + 1);
121 if (port_str.empty())
122 return false;
123 const char kAsciiDigits[] = "0123456789";
124 COMPILE_ASSERT(sizeof(kAsciiDigits) == 10 + 1, mess_with_digits);
125 if (port_str.find_first_not_of(kAsciiDigits) != std::string::npos)
126 return false;
127 if (!FetchDecimalDigits(port_str, port) ||
128 *port <= 0 ||
129 *port >= (1 << 16)) {
130 return false;
131 }
132 input.resize(pos);
133
134 pos = input.find_first_of(':');
135 if (pos == std::string::npos)
136 return false;
137 token->assign(input, 0, pos);
138 name->assign(input, pos + 1, std::string::npos);
139 return !name->empty();
140 }
141
142 std::string FetchExtensionIdFromOrigin(const std::string origin) {
143 // Origin of extension looks like "chrome-extension://EXTENSION_ID".
144 return origin.substr(origin.find_last_of('/'));
145 }
146
147 inline size_t strlen(const uint8* s) {
148 return ::strlen(reinterpret_cast<const char*>(s));
149 }
150
151 void SendNotification() {
152 DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
153 NotificationService::current()->Notify(
154 NotificationType::WEB_SOCKET_PROXY_STARTED,
155 NotificationService::AllSources(), NotificationService::NoDetails());
156 }
157
158 class Conn;
159
160 // Websocket to TCP proxy server.
161 class Serv {
162 public:
163 Serv(const std::vector<std::string>& allowed_origins,
164 struct sockaddr* addr, int addr_len);
165 ~Serv();
166
167 // Do not call it twice.
168 void Run();
169
170 // Terminates running server (should be called on a different thread).
171 void Shutdown();
172
173 void ZapConn(Conn*);
174 void MarkConnImportance(Conn*, bool important);
175 Conn* GetFreshConn();
176 bool IsConnSane(Conn*);
177 bool IsOriginAllowed(const std::string& origin);
178 void CloseAll();
179
180 static void OnConnect(int listening_sock, short event, void*);
181 static void OnShutdownRequest(int fd, short event, void*);
182
183 struct event_base* evbase() { return evbase_; }
184
185 // Checked against value of Origin field specified
186 // in a client websocket handshake.
187 std::vector<std::string> allowed_origins_;
188
189 // Address to listen incoming websocket connections.
190 struct sockaddr* addr_;
191 int addr_len_;
192
193 // Libevent base.
194 struct event_base* evbase_;
195
196 // Socket to listen incoming websocket connections.
197 int listening_sock_;
198
199 // Event on this descriptor triggers server shutdown.
200 int shutdown_descriptor_[2];
201
202 // Flag whether shutdown has been requested.
203 bool shutdown_requested_;
204
205 // List of pending connections; We are trying to keep size of this list
206 // below kConnPoolLimit in LRU fashion.
207 typedef std::list<Conn*> ConnPool;
208 ConnPool conn_pool_;
209
210 // Reverse map to look up a connection in a conn_pool.
211 typedef std::map<Conn*, ConnPool::iterator> RevMap;
212 RevMap rev_map_;
213
214 scoped_ptr<struct event> connection_event_;
215 scoped_ptr<struct event> shutdown_event_;
216
217 DISALLOW_COPY_AND_ASSIGN(Serv);
218 };
219
220 // Connection (amalgamates both channels between proxy and javascript and
221 // between proxy and destination).
222 class Conn {
223 public:
224 enum Phase {
225 // Initial stage of connection.
226 PHASE_WAIT_HANDSHAKE,
227 PHASE_WAIT_DESTFRAME,
228 PHASE_WAIT_DESTCONNECT,
229
230 // Operational stage of connection.
231 PHASE_OUTSIDE_FRAME,
232 PHASE_INSIDE_FRAME_BASE64,
233 PHASE_INSIDE_FRAME_SKIP,
234
235 // Terminal stage of connection.
236 PHASE_SHUT, // Closing handshake was emitted, buffers may be pending.
237 PHASE_DEFUNCT // Connection was nuked.
238 };
239
240 // Channel structure (either proxy<->javascript or proxy<->destination).
241 class Chan {
242 public:
243 explicit Chan(Conn* master)
244 : master_(master), sock_(-1), bev_(NULL), write_pending_(false) {
245 }
246
247 ~Chan() {
248 Zap();
249 }
250
251 // Returns true on success.
252 bool Write(const void* data, size_t size) {
253 if (bev_ == NULL || sock_ < 0)
254 return false;
255 write_pending_ = true;
256 return (0 == bufferevent_write(bev_, data, size));
257 }
258
259 void Zap() {
260 if (bev_) {
261 bufferevent_disable(bev_, EV_READ | EV_WRITE);
262 bufferevent_free(bev_);
263 bev_ = NULL;
264 }
265 if (sock_ >= 0) {
266 shutdown(sock_, SHUT_RDWR);
267 close(sock_);
268 sock_ = -1;
269 }
270 write_pending_ = false;
271 master_->ConsiderSuicide();
272 }
273
274 void Shut() {
275 if (!write_pending_)
276 Zap();
277 }
278
279 int& sock() { return sock_; }
280 bool& write_pending() { return write_pending_; }
281 struct bufferevent*& bev() { return bev_; }
282
283 private:
284 Conn* master_;
285 int sock_; // UNIX descriptor.
286 struct bufferevent* bev_;
287 bool write_pending_; // Whether write buffer is not flushed yet.
288 };
289
290 // Status of processing incoming data.
291 enum Status {
292 STATUS_OK,
293 STATUS_INCOMPLETE, // Not all required data is present in buffer yet.
294 STATUS_SKIP,
295 STATUS_ABORT // Data is invalid. We must shut connection.
296 };
297
298 // Unfortunately evdns callbacks are uncancellable,
299 // so potentially we can receive callback for a deleted Conn.
300 // Even worse, storage of deleted Conn may be reused
301 // for a new connection and new connection can receive callback
302 // destined for deleted Conn.
303 // EventKey is introduced in order to prevent that.
304 typedef void* EventKey;
305 typedef std::map<EventKey, Conn*> EventKeyMap;
306
307 explicit Conn(Serv* master);
308 ~Conn();
309
310 static Conn* Get(EventKey evkey);
311
312 void Shut();
313
314 void ConsiderSuicide();
315
316 Status ConsumeHeader(struct evbuffer*);
317 Status ConsumeDestframe(struct evbuffer*);
318 Status ConsumeFrameHeader(struct evbuffer*);
319 Status ProcessFrameData(struct evbuffer*);
320
321 // Returns true on success.
322 bool EmitHandshake(Chan*);
323
324 // Attempts to establish second connection (to remote TCP service).
325 // Returns true on success.
326 bool TryConnectDest(const struct sockaddr*, socklen_t);
327
328 // Used as libevent callbacks.
329 static void OnDestConnectTimeout(int, short, EventKey);
330 static void OnPrimchanRead(struct bufferevent*, EventKey);
331 static void OnPrimchanWrite(struct bufferevent*, EventKey);
332 static void OnPrimchanError(struct bufferevent*, short what, EventKey);
333 static void OnDestResolutionIPv4(int result, char type, int count,
334 int ttl, void* addr_list, EventKey);
335 static void OnDestResolutionIPv6(int result, char type, int count,
336 int ttl, void* addr_list, EventKey);
337 static void OnDestchanRead(struct bufferevent*, EventKey);
338 static void OnDestchanWrite(struct bufferevent*, EventKey);
339 static void OnDestchanError(struct bufferevent*, short what, EventKey);
340
341 Chan& primchan() { return primchan_; }
342 EventKey evkey() const { return evkey_; }
343
344 private:
345 Serv* master_;
346 Phase phase_;
347
348 // We maintain two channels per Conn:
349 // primary channel is websocket connection.
350 Chan primchan_;
351 // Destination channel is a proxied connection.
352 Chan destchan_;
353
354 EventKey evkey_;
355
356 // Header fields supplied by client at initial websocket handshake.
357 std::map<std::string, std::string> header_fields_;
358
359 // Cryptohashed answer for websocket handshake.
360 MD5Digest handshake_response_;
361
362 // Hostname and port of destination socket.
363 // Websocket client supplies them in first data frame (destframe).
364 std::string destname_;
365 uint32 destport_;
366
367 // We try to DNS resolve hostname in both IPv4 and IPv6 domains.
368 // Track resolution failures here.
369 bool destresolution_ipv4_failed_;
370 bool destresolution_ipv6_failed_;
371
372 // Used to schedule a timeout for initial phase of connection.
373 scoped_ptr<struct event> destconnect_timeout_event_;
374
375 static EventKeyMap evkey_map_;
376 static EventKey last_evkey_;
377
378 DISALLOW_COPY_AND_ASSIGN(Conn);
379 };
380
381 Serv::Serv(
382 const std::vector<std::string>& allowed_origins,
383 struct sockaddr* addr, int addr_len)
384 : allowed_origins_(allowed_origins),
385 addr_(addr),
386 addr_len_(addr_len),
387 evbase_(NULL),
388 listening_sock_(-1),
389 shutdown_requested_(false) {
390 std::sort(allowed_origins_.begin(), allowed_origins_.end());
391 shutdown_descriptor_[0] = -1;
392 shutdown_descriptor_[1] = -1;
393 }
394
395 Serv::~Serv() {
396 CloseAll();
397 }
398
399 void Serv::Run() {
400 if (evbase_ || shutdown_requested_)
401 return;
402
403 evbase_ = event_init();
404 if (!evbase_) {
405 LOG(ERROR) << "WebSocketProxy: Couldn't create libevent base";
406 return;
407 }
408
409 if (pipe(shutdown_descriptor_)) {
410 LOG(ERROR) << "WebSocketProxy: Failed to create shutdown pipe";
411 return;
412 }
413
414 listening_sock_ = socket(AF_INET, SOCK_STREAM, 0);
415 if (listening_sock_ < 0) {
416 LOG(ERROR) << "WebSocketProxy: Failed to create socket";
417 return;
418 }
419 if (bind(listening_sock_, addr_, addr_len_)) {
420 LOG(ERROR) << "WebSocketProxy: Failed to bind server socket";
421 return;
422 }
423 if (listen(listening_sock_, 12)) {
424 LOG(ERROR) << "WebSocketProxy: Failed to listen server socket";
425 return;
426 }
427 {
428 int on = 1;
429 setsockopt(listening_sock_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
430 }
431 if (!SetNonBlock(listening_sock_)) {
432 LOG(ERROR) << "WebSocketProxy: Failed to go non block";
433 return;
434 }
435
436 connection_event_.reset(new struct event);
437 event_set(connection_event_.get(), listening_sock_, EV_READ | EV_PERSIST,
438 &OnConnect, this);
439 event_base_set(evbase_, connection_event_.get());
440 if (event_add(connection_event_.get(), NULL)) {
441 LOG(ERROR) << "WebSocketProxy: Failed to add listening event";
442 return;
443 }
444
445 shutdown_event_.reset(new struct event);
446 event_set(shutdown_event_.get(), shutdown_descriptor_[0], EV_READ,
447 &OnShutdownRequest, this);
448 event_base_set(evbase_, shutdown_event_.get());
449 if (event_add(shutdown_event_.get(), NULL)) {
450 LOG(ERROR) << "WebSocketProxy: Failed to add shutdown event";
451 return;
452 }
453
454 if (evdns_init()) {
455 LOG(ERROR) << "WebSocketProxy: Failed to initialize evDNS";
456 return;
457 }
458 if (!IgnoreSigPipe()) {
459 LOG(ERROR) << "WebSocketProxy: Failed to ignore SIGPIPE";
460 return;
461 }
462
463 BrowserThread::PostTask(
464 BrowserThread::UI, FROM_HERE,
465 NewRunnableFunction(&SendNotification));
466
467 LOG(INFO) << "WebSocketProxy: Starting event dispatch loop.";
468 event_base_dispatch(evbase_);
469 if (shutdown_requested_)
470 LOG(INFO) << "WebSocketProxy: Event dispatch loop terminated upon request";
471 else
472 LOG(ERROR) << "WebSocketProxy: Event dispatch loop terminated unexpectedly";
473 CloseAll();
474 }
475
476 void Serv::Shutdown() {
477 if (1 != write(shutdown_descriptor_[1], ".", 1))
478 NOTREACHED();
479 }
480
481 void Serv::CloseAll() {
482 while (!conn_pool_.empty())
483 ZapConn(conn_pool_.back());
484 if (listening_sock_ >= 0) {
485 shutdown(listening_sock_, SHUT_RDWR);
486 close(listening_sock_);
487 }
488 for (int i = 0; i < 2; ++i) {
489 if (shutdown_descriptor_[i] >= 0) {
490 shutdown_descriptor_[i] = -1;
491 close(shutdown_descriptor_[i]);
492 }
493 }
494 if (shutdown_event_.get()) {
495 event_del(shutdown_event_.get());
496 shutdown_event_.reset();
497 }
498 if (connection_event_.get()) {
499 event_del(connection_event_.get());
500 connection_event_.reset();
501 }
502 if (evbase_) {
503 event_base_free(evbase_);
504 evbase_ = NULL;
505 }
506 }
507
508 void Serv::ZapConn(Conn* cs) {
509 RevMap::iterator rit = rev_map_.find(cs);
510 if (rit != rev_map_.end()) {
511 conn_pool_.erase(rit->second);
512 rev_map_.erase(rit);
513 delete cs;
514 }
515 }
516
517 void Serv::MarkConnImportance(Conn* cs, bool important) {
518 if (conn_pool_.size() < WebSocketProxy::kConnPoolLimit / 4) {
519 // Fast common path.
520 return;
521 }
522 RevMap::iterator rit = rev_map_.find(cs);
523 if (rit != rev_map_.end()) {
524 ConnPool::iterator it = rit->second;
525 CHECK(*it == cs);
526 if (important && it == conn_pool_.begin()) {
527 // Already at the top. Shortcut.
528 return;
529 }
530 conn_pool_.erase(it);
531 }
532 if (important) {
533 conn_pool_.push_front(cs);
534 rev_map_[cs] = conn_pool_.begin();
535 } else {
536 conn_pool_.push_back(cs);
537 rev_map_[cs] = conn_pool_.end();
538 --rev_map_[cs];
539 }
540 }
541
542 Conn* Serv::GetFreshConn() {
543 if (conn_pool_.size() > WebSocketProxy::kConnPoolLimit) {
544 // Connections overflow. Shut those oldest not active.
545 ConnPool::iterator it = conn_pool_.end();
546 --it;
547 for (int i = conn_pool_.size() - WebSocketProxy::kConnPoolLimit; i-- > 0;) {
548 // Shut may invalidate an iterator; hence postdecrement.
549 (*it--)->Shut();
550 }
551 if (conn_pool_.size() > WebSocketProxy::kConnPoolLimit + 12) {
552 // Connections overflow. Zap the oldest not active.
553 ZapConn(conn_pool_.back());
554 }
555 }
556 Conn* cs = new Conn(this);
557 conn_pool_.push_front(cs);
558 rev_map_[cs] = conn_pool_.begin();
559 return cs;
560 }
561
562 bool Serv::IsConnSane(Conn* cs) {
563 return rev_map_.find(cs) != rev_map_.end();
564 }
565
566 bool Serv::IsOriginAllowed(const std::string& origin) {
567 return allowed_origins_.empty() || std::binary_search(
568 allowed_origins_.begin(), allowed_origins_.end(), origin);
569 }
570
571 // static
572 void Serv::OnConnect(int listening_sock, short event, void* ctx) {
573 Serv* self = static_cast<Serv*>(ctx);
574 Conn* cs = self->GetFreshConn();
575 cs->primchan().sock() = accept(listening_sock, NULL, NULL);
576 if (cs->primchan().sock() < 0
577 || !SetNonBlock(cs->primchan().sock())) {
578 // Read readiness was triggered on listening socket
579 // yet we failed to accept a connection; definitely weird.
580 NOTREACHED();
581 self->ZapConn(cs);
582 return;
583 }
584
585 cs->primchan().bev() = bufferevent_new(
586 cs->primchan().sock(),
587 &Conn::OnPrimchanRead, &Conn::OnPrimchanWrite, &Conn::OnPrimchanError,
588 cs->evkey());
589 if (cs->primchan().bev() == NULL) {
590 self->ZapConn(cs);
591 return;
592 }
593 bufferevent_base_set(self->evbase_, cs->primchan().bev());
594 bufferevent_setwatermark(
595 cs->primchan().bev(), EV_READ, 0, WebSocketProxy::kReadBufferLimit);
596 if (bufferevent_enable(cs->primchan().bev(), EV_READ | EV_WRITE)) {
597 self->ZapConn(cs);
598 return;
599 }
600 }
601
602 // static
603 void Serv::OnShutdownRequest(int fd, short event, void* ctx) {
604 Serv* self = static_cast<Serv*>(ctx);
605 self->shutdown_requested_ = true;
606 event_base_loopbreak(self->evbase_);
607 }
608
609 Conn::Conn(Serv* master)
610 : master_(master),
611 phase_(PHASE_WAIT_HANDSHAKE),
612 primchan_(this),
613 destchan_(this),
614 destresolution_ipv4_failed_(false),
615 destresolution_ipv6_failed_(false) {
616 while (evkey_map_.find(last_evkey_) != evkey_map_.end()) {
617 evkey_ = last_evkey_ =
618 reinterpret_cast<EventKey>(reinterpret_cast<size_t>(last_evkey_) + 1);
619 }
620 evkey_map_[evkey_] = this;
621 // Schedule timeout for initial phase of connection.
622 destconnect_timeout_event_.reset(new struct event);
623 evtimer_set(destconnect_timeout_event_.get(),
624 &OnDestConnectTimeout, evkey_);
625 event_base_set(master_->evbase(),
626 destconnect_timeout_event_.get());
627
628 struct timeval tv;
629 tv.tv_sec = 20;
630 tv.tv_usec = 0;
631 evtimer_add(destconnect_timeout_event_.get(), &tv);
632 }
633
634 Conn::~Conn() {
635 phase_ = PHASE_DEFUNCT;
636 event_del(destconnect_timeout_event_.get());
637 if (evkey_map_[evkey_] == this)
638 evkey_map_.erase(evkey_);
639 else
640 NOTREACHED();
641 }
642
643 Conn* Conn::Get(EventKey evkey) {
644 EventKeyMap::iterator it = evkey_map_.find(evkey);
645 if (it == evkey_map_.end())
646 return NULL;
647 Conn* cs = it->second;
648 if (cs == NULL ||
649 cs->evkey_ != evkey ||
650 cs->master_ == NULL ||
651 cs->phase_ < 0 ||
652 cs->phase_ > PHASE_SHUT ||
653 !cs->master_->IsConnSane(cs)) {
654 return NULL;
655 }
656 return cs;
657 }
658
659 void Conn::Shut() {
660 if (phase_ >= PHASE_SHUT)
661 return;
662 master_->MarkConnImportance(this, false);
663 static const uint8 closing_handshake[9] = { 0 };
664 primchan_.Write(closing_handshake, sizeof(closing_handshake));
665 primchan_.Shut();
666 destchan_.Shut();
667 phase_ = PHASE_SHUT;
668 }
669
670 void Conn::ConsiderSuicide() {
671 if (!primchan_.write_pending() && !destchan_.write_pending())
672 master_->ZapConn(this);
673 }
674
675 Conn::Status Conn::ConsumeHeader(struct evbuffer* evb) {
676 uint8* buf = EVBUFFER_DATA(evb);
677 size_t buf_size = EVBUFFER_LENGTH(evb);
678
679 static const uint8 kGetMagic[] = "GET " kProxyPath " ";
680 static const uint8 kKeyValueDelimiter[] = ": ";
681
682 if (buf_size <= 0)
683 return STATUS_INCOMPLETE;
684 if (!buf)
685 return STATUS_ABORT;
686 if (!std::equal(buf, buf + std::min(buf_size, strlen(kGetMagic)),
687 kGetMagic)) {
688 // Data head does not match what is expected.
689 return STATUS_ABORT;
690 }
691
692 if (buf_size >= WebSocketProxy::kHeaderLimit)
693 return STATUS_ABORT;
694 uint8* buf_end = buf + buf_size;
695 uint8* term_pos = std::search(buf, buf_end, kCRLFCRLF,
696 kCRLFCRLF + strlen(kCRLFCRLF));
697 uint8 key3[8]; // Notation (key3) matches websocket RFC.
698 if (buf_end - term_pos - strlen(kCRLFCRLF) < sizeof(key3))
699 return STATUS_INCOMPLETE;
700 term_pos += strlen(kCRLFCRLF);
701 memcpy(key3, term_pos, sizeof(key3));
702 term_pos += sizeof(key3);
703 // First line is "GET /tcpproxy" line, so we skip it.
704 uint8* pos = std::search(buf, term_pos, kCRLF, kCRLF + strlen(kCRLF));
705 if (pos == term_pos)
706 return STATUS_ABORT;
707 for (;;) {
708 pos += strlen(kCRLF);
709 if (term_pos - pos <
710 static_cast<ptrdiff_t>(sizeof(key3) + strlen(kCRLF))) {
711 return STATUS_ABORT;
712 }
713 if (term_pos - pos ==
714 static_cast<ptrdiff_t>(sizeof(key3) + strlen(kCRLF))) {
715 break;
716 }
717 uint8* npos = std::search(pos, term_pos, kKeyValueDelimiter,
718 kKeyValueDelimiter + strlen(kKeyValueDelimiter));
719 if (npos == term_pos)
720 return STATUS_ABORT;
721 std::string key = FetchLowerCasedASCIISnippet(pos, npos);
722 pos = std::search(npos += strlen(kKeyValueDelimiter), term_pos,
723 kCRLF, kCRLF + strlen(kCRLF));
724 if (pos == term_pos)
725 return STATUS_ABORT;
726 if (!key.empty())
727 header_fields_[key] = FetchLowerCasedASCIISnippet(npos, pos);
728 }
729
730 // Values of Upgrade and Connection fields are hardcoded in the protocol.
731 if (header_fields_["upgrade"] != "websocket" ||
732 header_fields_["connection"] != "upgrade") {
733 return STATUS_ABORT;
734 }
735
736 if (!master_->IsOriginAllowed(header_fields_["origin"]))
737 return STATUS_ABORT;
738
739 static const std::string kSecKey1 = "sec-websocket-key1";
740 static const std::string kSecKey2 = "sec-websocket-key2";
741 uint32 key_number1, key_number2;
742 if (!FetchDecimalDigits(header_fields_[kSecKey1],
743 &key_number1) ||
744 !FetchDecimalDigits(header_fields_[kSecKey2],
745 &key_number2)) {
746 return STATUS_ABORT;
747 }
748
749 // We limit incoming header size so following numbers shall not be too high.
750 int spaces1 = CountSpaces(header_fields_[kSecKey1]);
751 int spaces2 = CountSpaces(header_fields_[kSecKey2]);
752 if (spaces1 == 0 ||
753 spaces2 == 0 ||
754 key_number1 % spaces1 != 0 ||
755 key_number2 % spaces2 != 0) {
756 return STATUS_ABORT;
757 }
758
759 uint8 challenge[4 + 4 + sizeof(key3)];
760 uint32 part1 = htonl(key_number1 / spaces1);
761 uint32 part2 = htonl(key_number2 / spaces2);
762 memcpy(challenge, &part1, 4);
763 memcpy(challenge + 4, &part2, 4);
764 memcpy(challenge + sizeof(challenge) - sizeof(key3), key3, sizeof(key3));
765 MD5Sum(challenge, sizeof(challenge), &handshake_response_);
766
767 evbuffer_drain(evb, term_pos - buf);
768 return STATUS_OK;
769 }
770
771 bool Conn::EmitHandshake(Chan* chan) {
772 std::vector<std::string> boilerplate;
773 boilerplate.push_back("HTTP/1.1 101 WebSocket Protocol Handshake");
774 boilerplate.push_back("Upgrade: WebSocket");
775 boilerplate.push_back("Connection: Upgrade");
776
777 {
778 // Take care of Location field.
779 char buf[128];
780 int rv = snprintf(buf, sizeof(buf),
781 "Sec-WebSocket-Location: ws://%s%s",
782 header_fields_["host"].c_str(),
783 kProxyPath);
784 if (rv <= 0 || rv + 0u >= sizeof(buf))
785 return false;
786 boilerplate.push_back(buf);
787 }
788 {
789 // Take care of Origin field.
790 if (header_fields_.find("origin") != header_fields_.end()) {
791 char buf[128];
792 int rv = snprintf(buf, sizeof(buf),
793 "Sec-WebSocket-Origin: %s",
794 header_fields_["origin"].c_str());
795 if (rv <= 0 || rv + 0u >= sizeof(buf))
796 return false;
797 boilerplate.push_back(buf);
798 }
799 }
800
801 boilerplate.push_back("");
802 for (size_t i = 0; i < boilerplate.size(); ++i) {
803 if (!chan->Write(boilerplate[i].c_str(), boilerplate[i].size()) ||
804 !chan->Write(kCRLF, strlen(kCRLF))) {
805 return false;
806 }
807 }
808 return chan->Write(&handshake_response_, sizeof(handshake_response_));
809 }
810
811 Conn::Status Conn::ConsumeDestframe(struct evbuffer* evb) {
812 uint8* buf = EVBUFFER_DATA(evb);
813 size_t buf_size = EVBUFFER_LENGTH(evb);
814
815 if (buf_size < 1)
816 return STATUS_INCOMPLETE;
817 if (buf[0] != 0)
818 return STATUS_ABORT;
819 if (buf_size < 1 + 1)
820 return STATUS_INCOMPLETE;
821 uint8* buf_end = buf + buf_size;
822 uint8* term_pos = std::find(buf + 1, buf_end, 0xff);
823 if (term_pos == buf_end) {
824 if (buf_size >= WebSocketProxy::kHeaderLimit) {
825 // So big and still worth nothing.
826 return STATUS_ABORT;
827 }
828 return STATUS_INCOMPLETE;
829 }
830
831 std::string token;
832 if (!FetchTokenNamePort(buf + 1, term_pos, &token, &destname_, &destport_))
833 return STATUS_ABORT;
834 // TODO(dilmah): enable this once webSocketProxyPrivate.getToken is wired.
835 #if 0
836 std::map<std::string, std::string> map;
837 map["hostname"] = destname_;
838 map["port"] = base::IntToString(destport_);
839 map["extension_id"] = FetchExtensionIdFromOrigin(header_fields_["origin"]);
840 if (!browser::InternalAuthVerification::VerifyToken(
841 "web_socket_proxy", token, map)) {
842 return STATUS_ABORT;
843 }
844 #endif
845
846 evbuffer_drain(evb, term_pos - buf + 1);
847 return STATUS_OK;
848 }
849
850 Conn::Status Conn::ConsumeFrameHeader(struct evbuffer* evb) {
851 uint8* buf = EVBUFFER_DATA(evb);
852 size_t buf_size = EVBUFFER_LENGTH(evb);
853
854 if (buf_size < 1)
855 return STATUS_INCOMPLETE;
856 if (buf[0] != 0)
857 return STATUS_ABORT;
858 evbuffer_drain(evb, 1);
859 return STATUS_OK;
860 }
861
862 Conn::Status Conn::ProcessFrameData(struct evbuffer* evb) {
863 uint8* buf = EVBUFFER_DATA(evb);
864 size_t buf_size = EVBUFFER_LENGTH(evb);
865
866 if (buf_size < 1)
867 return STATUS_INCOMPLETE;
868 uint8* buf_end = buf + buf_size;
869 uint8* term_pos = std::find(buf, buf_end, 0xff);
870 bool term_detected = (term_pos != buf_end);
871 if (term_detected)
872 buf_size = term_pos - buf;
873 switch (phase_) {
874 case PHASE_INSIDE_FRAME_BASE64: {
875 if (term_detected && buf_size % 4) {
876 // base64 is encoded in chunks of 4 bytes.
877 return STATUS_ABORT;
878 }
879 if (buf_size < 4) {
880 DCHECK(!term_detected);
881 return STATUS_INCOMPLETE;
882 }
883 size_t bytes_to_process_atm = (buf_size / 4) * 4;
884 std::string out_bytes;
885 base::Base64Decode(std::string(buf, buf + bytes_to_process_atm),
886 &out_bytes);
887 evbuffer_drain(evb, bytes_to_process_atm);
888 DCHECK(destchan_.bev() != NULL);
889 if (!destchan_.Write(out_bytes.c_str(), out_bytes.size()))
890 return STATUS_ABORT;
891 break;
892 }
893 case PHASE_INSIDE_FRAME_SKIP: {
894 evbuffer_drain(evb, buf_size);
895 break;
896 }
897 default: {
898 return STATUS_ABORT;
899 }
900 }
901 if (term_detected) {
902 evbuffer_drain(evb, 1);
903 return STATUS_OK;
904 }
905 return STATUS_INCOMPLETE;
906 }
907
908 bool Conn::TryConnectDest(const struct sockaddr* addr,
909 socklen_t addrlen) {
910 if (destchan_.sock() >= 0 || destchan_.bev() != NULL)
911 return false;
912 destchan_.sock() = socket(addr->sa_family, SOCK_STREAM, 0);
913 if (destchan_.sock() < 0)
914 return false;
915 if (!SetNonBlock(destchan_.sock()))
916 return false;
917 if (connect(destchan_.sock(), addr, addrlen)) {
918 if (errno != EINPROGRESS)
919 return false;
920 }
921 destchan_.bev() = bufferevent_new(
922 destchan_.sock(),
923 &OnDestchanRead, &OnDestchanWrite, &OnDestchanError,
924 evkey_);
925 if (destchan_.bev() == NULL)
926 return false;
927 if (bufferevent_base_set(master_->evbase(), destchan_.bev()))
928 return false;
929 bufferevent_setwatermark(
930 destchan_.bev(), EV_READ, 0, WebSocketProxy::kReadBufferLimit);
931 return !bufferevent_enable(destchan_.bev(), EV_READ | EV_WRITE);
932 }
933
934 // static
935 void Conn::OnPrimchanRead(struct bufferevent* bev, EventKey evkey) {
936 Conn* cs = Conn::Get(evkey);
937 if (bev == NULL ||
938 cs == NULL ||
939 bev != cs->primchan_.bev()) {
940 NOTREACHED();
941 return;
942 }
943 if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) <= 0)
944 return;
945 cs->master_->MarkConnImportance(cs, true);
946 for (;;) {
947 switch (cs->phase_) {
948 case PHASE_WAIT_HANDSHAKE: {
949 switch (cs->ConsumeHeader(EVBUFFER_INPUT(bev))) {
950 case STATUS_OK: {
951 break;
952 }
953 case STATUS_INCOMPLETE: {
954 return;
955 }
956 case STATUS_ABORT:
957 default: {
958 cs->master_->ZapConn(cs);
959 return;
960 }
961 }
962 // Header consumed OK. Do respond.
963 if (!cs->EmitHandshake(&cs->primchan_)) {
964 cs->master_->ZapConn(cs);
965 return;
966 }
967 cs->phase_ = PHASE_WAIT_DESTFRAME;
968 return;
969 }
970 case PHASE_WAIT_DESTFRAME: {
971 switch (cs->ConsumeDestframe(EVBUFFER_INPUT(bev))) {
972 case STATUS_OK: {
973 {
974 struct sockaddr_in sa;
975 memset(&sa, 0, sizeof(sa));
976 sa.sin_port = htons(cs->destport_);
977 if (inet_pton(sa.sin_family = AF_INET,
978 cs->destname_.c_str(),
979 &sa.sin_addr) == 1) {
980 // valid IPv4 address supplied.
981 if (cs->TryConnectDest((struct sockaddr*)&sa, sizeof(sa))) {
982 cs->phase_ = PHASE_WAIT_DESTCONNECT;
983 return;
984 }
985 }
986 }
987 {
988 if (cs->destname_.size() >= 2 &&
989 cs->destname_[0] == '[' &&
990 cs->destname_[cs->destname_.size() - 1] == ']') {
991 // Literal IPv6 address in brackets.
992 cs->destname_ =
993 cs->destname_.substr(1, cs->destname_.size() - 2);
994 }
995 struct sockaddr_in6 sa;
996 memset(&sa, 0, sizeof(sa));
997 sa.sin6_port = htons(cs->destport_);
998 if (inet_pton(sa.sin6_family = AF_INET6,
999 cs->destname_.c_str(),
1000 &sa.sin6_addr) == 1) {
1001 // valid IPv6 address supplied.
1002 if (cs->TryConnectDest((struct sockaddr*)&sa, sizeof(sa))) {
1003 cs->phase_ = PHASE_WAIT_DESTCONNECT;
1004 return;
1005 }
1006 }
1007 }
1008 // Try to asynchronously perform DNS resolution.
1009 evdns_resolve_ipv4(cs->destname_.c_str(), 0,
1010 &OnDestResolutionIPv4, evkey);
1011 evdns_resolve_ipv6(cs->destname_.c_str(), 0,
1012 &OnDestResolutionIPv6, evkey);
1013 cs->phase_ = PHASE_WAIT_DESTCONNECT;
1014 return;
1015 }
1016 case STATUS_INCOMPLETE: {
1017 return;
1018 }
1019 case STATUS_ABORT:
1020 default: {
1021 cs->Shut();
1022 return;
1023 }
1024 }
1025 }
1026 case PHASE_WAIT_DESTCONNECT: {
1027 if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) >=
1028 WebSocketProxy::kReadBufferLimit) {
1029 cs->Shut();
1030 }
1031 return;
1032 }
1033 case PHASE_OUTSIDE_FRAME: {
1034 switch (cs->ConsumeFrameHeader(EVBUFFER_INPUT(bev))) {
1035 case STATUS_OK: {
1036 cs->phase_ = PHASE_INSIDE_FRAME_BASE64;
1037 // Process remaining data if any.
1038 break;
1039 }
1040 case STATUS_SKIP: {
1041 cs->phase_ = PHASE_INSIDE_FRAME_SKIP;
1042 // Process remaining data if any.
1043 break;
1044 }
1045 case STATUS_INCOMPLETE: {
1046 return;
1047 }
1048 case STATUS_ABORT:
1049 default: {
1050 cs->Shut();
1051 return;
1052 }
1053 }
1054 break;
1055 }
1056 case PHASE_INSIDE_FRAME_BASE64:
1057 case PHASE_INSIDE_FRAME_SKIP: {
1058 switch (cs->ProcessFrameData(EVBUFFER_INPUT(bev))) {
1059 case STATUS_OK: {
1060 cs->phase_ = PHASE_OUTSIDE_FRAME;
1061 // Handle remaining data if any.
1062 break;
1063 }
1064 case STATUS_INCOMPLETE: {
1065 return;
1066 }
1067 case STATUS_ABORT:
1068 default: {
1069 cs->Shut();
1070 return;
1071 }
1072 }
1073 break;
1074 }
1075 case PHASE_SHUT: {
1076 evbuffer_drain(EVBUFFER_INPUT(bev),
1077 EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)));
1078 return;
1079 }
1080 case PHASE_DEFUNCT:
1081 default: {
1082 NOTREACHED();
1083 cs->master_->ZapConn(cs);
1084 return;
1085 }
1086 }
1087 }
1088 }
1089
1090 // static
1091 void Conn::OnPrimchanWrite(struct bufferevent* bev, EventKey evkey) {
1092 Conn* cs = Conn::Get(evkey);
1093 if (bev == NULL ||
1094 cs == NULL ||
1095 bev != cs->primchan_.bev()) {
1096 NOTREACHED();
1097 return;
1098 }
1099 cs->primchan_.write_pending() = false;
1100 if (cs->phase_ >= PHASE_SHUT) {
1101 cs->master_->ZapConn(cs);
1102 return;
1103 }
1104 if (cs->phase_ > PHASE_WAIT_DESTCONNECT)
1105 OnDestchanRead(cs->destchan_.bev(), evkey);
1106 if (cs->phase_ >= PHASE_SHUT)
1107 cs->primchan_.Zap();
1108 }
1109
1110 // static
1111 void Conn::OnPrimchanError(struct bufferevent* bev,
1112 short what, EventKey evkey) {
1113 Conn* cs = Conn::Get(evkey);
1114 if (bev == NULL ||
1115 cs == NULL ||
1116 bev != cs->primchan_.bev()) {
1117 return;
1118 }
1119 cs->primchan_.write_pending() = false;
1120 if (cs->phase_ >= PHASE_SHUT)
1121 cs->master_->ZapConn(cs);
1122 else
1123 cs->Shut();
1124 }
1125
1126 // static
1127 void Conn::OnDestResolutionIPv4(int result, char type,
1128 int count, int ttl,
1129 void* addr_list, EventKey evkey) {
1130 Conn* cs = Conn::Get(evkey);
1131 if (cs == NULL)
1132 return;
1133 if (cs->phase_ != PHASE_WAIT_DESTCONNECT)
1134 return;
1135 if (result == DNS_ERR_NONE &&
1136 count >= 1 &&
1137 addr_list != NULL &&
1138 type == DNS_IPv4_A) {
1139 for (int i = 0; i < count; ++i) {
1140 struct sockaddr_in sa;
1141 memset(&sa, 0, sizeof(sa));
1142 sa.sin_family = AF_INET;
1143 sa.sin_port = htons(cs->destport_);
1144 DCHECK(sizeof(sa.sin_addr) == sizeof(struct in_addr));
1145 memcpy(&sa.sin_addr,
1146 static_cast<struct in_addr*>(addr_list) + i,
1147 sizeof(sa.sin_addr));
1148 if (cs->TryConnectDest((struct sockaddr*)&sa, sizeof(sa)))
1149 return;
1150 }
1151 }
1152 cs->destresolution_ipv4_failed_ = true;
1153 if (cs->destresolution_ipv4_failed_ && cs->destresolution_ipv6_failed_)
1154 cs->Shut();
1155 }
1156
1157 // static
1158 void Conn::OnDestResolutionIPv6(int result, char type,
1159 int count, int ttl,
1160 void* addr_list, EventKey evkey) {
1161 Conn* cs = Conn::Get(evkey);
1162 if (cs == NULL)
1163 return;
1164 if (cs->phase_ != PHASE_WAIT_DESTCONNECT)
1165 return;
1166 if (result == DNS_ERR_NONE &&
1167 count >= 1 &&
1168 addr_list != NULL &&
1169 type == DNS_IPv6_AAAA) {
1170 for (int i = 0; i < count; ++i) {
1171 struct sockaddr_in6 sa;
1172 memset(&sa, 0, sizeof(sa));
1173 sa.sin6_family = AF_INET6;
1174 sa.sin6_port = htons(cs->destport_);
1175 DCHECK(sizeof(sa.sin6_addr) == sizeof(struct in6_addr));
1176 memcpy(&sa.sin6_addr,
1177 static_cast<struct in6_addr*>(addr_list) + i,
1178 sizeof(sa.sin6_addr));
1179 if (cs->TryConnectDest((struct sockaddr*)&sa, sizeof(sa)))
1180 return;
1181 }
1182 }
1183 cs->destresolution_ipv6_failed_ = true;
1184 if (cs->destresolution_ipv4_failed_ && cs->destresolution_ipv6_failed_)
1185 cs->Shut();
1186 }
1187
1188 // static
1189 void Conn::OnDestConnectTimeout(int, short, EventKey evkey) {
1190 Conn* cs = Conn::Get(evkey);
1191 if (cs == NULL)
1192 return;
1193 if (cs->phase_ > PHASE_WAIT_DESTCONNECT)
1194 return;
1195 cs->Shut();
1196 }
1197
1198 // static
1199 void Conn::OnDestchanRead(struct bufferevent* bev, EventKey evkey) {
1200 Conn* cs = Conn::Get(evkey);
1201 if (bev == NULL ||
1202 cs == NULL ||
1203 bev != cs->destchan_.bev()) {
1204 NOTREACHED();
1205 return;
1206 }
1207 if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) <= 0)
1208 return;
1209 if (cs->primchan_.bev() == NULL) {
1210 cs->master_->ZapConn(cs);
1211 return;
1212 }
1213 cs->master_->MarkConnImportance(cs, true);
1214 std::string out_bytes;
1215 base::Base64Encode(
1216 std::string(
1217 static_cast<const char*>(static_cast<void*>(
1218 EVBUFFER_DATA(EVBUFFER_INPUT(bev)))),
1219 EVBUFFER_LENGTH(EVBUFFER_INPUT(bev))),
1220 &out_bytes);
1221 evbuffer_drain(EVBUFFER_INPUT(bev), EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)));
1222 static const uint8 frame_header[] = { 0x00 };
1223 static const uint8 frame_terminator[] = { 0xff };
1224 if (!cs->primchan_.Write(frame_header, sizeof(frame_header)) ||
1225 !cs->primchan_.Write(out_bytes.c_str(), out_bytes.size()) ||
1226 !cs->primchan_.Write(frame_terminator, sizeof(frame_terminator))) {
1227 cs->Shut();
1228 }
1229 }
1230
1231 // static
1232 void Conn::OnDestchanWrite(struct bufferevent* bev, EventKey evkey) {
1233 Conn* cs = Conn::Get(evkey);
1234 if (bev == NULL ||
1235 cs == NULL ||
1236 bev != cs->destchan_.bev()) {
1237 NOTREACHED();
1238 return;
1239 }
1240 cs->destchan_.write_pending() = false;
1241 if (cs->phase_ == PHASE_WAIT_DESTCONNECT)
1242 cs->phase_ = PHASE_OUTSIDE_FRAME;
1243 if (cs->phase_ < PHASE_SHUT)
1244 OnPrimchanRead(cs->primchan_.bev(), evkey);
1245 else
1246 cs->destchan_.Zap();
1247 }
1248
1249 // static
1250 void Conn::OnDestchanError(struct bufferevent* bev,
1251 short what, EventKey evkey) {
1252 Conn* cs = Conn::Get(evkey);
1253 if (bev == NULL ||
1254 cs == NULL ||
1255 bev != cs->destchan_.bev()) {
1256 return;
1257 }
1258 cs->destchan_.write_pending() = false;
1259 if (cs->phase_ >= PHASE_SHUT)
1260 cs->master_->ZapConn(cs);
1261 else
1262 cs->Shut();
1263 }
1264
1265 Conn::EventKey Conn::last_evkey_ = 0;
1266 Conn::EventKeyMap Conn::evkey_map_;
1267
1268 } // namespace
1269
1270 WebSocketProxy::WebSocketProxy(
1271 const std::vector<std::string>& allowed_origins,
1272 struct sockaddr* addr, int addr_len)
1273 : impl_(new Serv(allowed_origins, addr, addr_len)) {
1274 }
1275
1276 WebSocketProxy::~WebSocketProxy() {
1277 delete static_cast<Serv*>(impl_);
1278 impl_ = NULL;
1279 }
1280
1281 void WebSocketProxy::Run() {
1282 static_cast<Serv*>(impl_)->Run();
1283 }
1284
1285 void WebSocketProxy::Shutdown() {
1286 static_cast<Serv*>(impl_)->Shutdown();
1287 }
1288
1289 } // namespace chromeos
1290
OLDNEW
« no previous file with comments | « chrome/browser/chromeos/web_socket_proxy.h ('k') | chrome/browser/chromeos/web_socket_proxy_controller.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698