Index: libcurl_http_fetcher.cc |
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc |
index 9ed0c64b90341eaeb17777e24bc84293da5e4b56..7a032fe460fb18cd54f49be99c71c1a65917c6ae 100644 |
--- a/libcurl_http_fetcher.cc |
+++ b/libcurl_http_fetcher.cc |
@@ -19,6 +19,7 @@ LibcurlHttpFetcher::~LibcurlHttpFetcher() { |
} |
void LibcurlHttpFetcher::ResumeTransfer(const std::string& url) { |
+ LOG(INFO) << "Starting/Resuming transfer"; |
CHECK(!transfer_in_progress_); |
url_ = url; |
curl_multi_handle_ = curl_multi_init(); |
@@ -49,9 +50,15 @@ void LibcurlHttpFetcher::ResumeTransfer(const std::string& url) { |
CHECK_EQ(curl_easy_setopt(curl_handle_, CURLOPT_WRITEFUNCTION, |
StaticLibcurlWrite), CURLE_OK); |
CHECK_EQ(curl_easy_setopt(curl_handle_, CURLOPT_URL, url_.c_str()), CURLE_OK); |
+ |
+ // If the connection drops under 10 bytes/sec for 90 seconds, reconnect. |
+ CHECK_EQ(curl_easy_setopt(curl_handle_, CURLOPT_LOW_SPEED_LIMIT, 10), |
+ CURLE_OK); |
+ CHECK_EQ(curl_easy_setopt(curl_handle_, CURLOPT_LOW_SPEED_TIME, 90), |
+ CURLE_OK); |
+ |
CHECK_EQ(curl_multi_add_handle(curl_multi_handle_, curl_handle_), CURLM_OK); |
transfer_in_progress_ = true; |
- CurlPerformOnce(); |
} |
// Begins the transfer, which must not have already been started. |
@@ -59,15 +66,16 @@ void LibcurlHttpFetcher::BeginTransfer(const std::string& url) { |
transfer_size_ = -1; |
bytes_downloaded_ = 0; |
resume_offset_ = 0; |
- ResumeTransfer(url); |
+ do { |
+ ResumeTransfer(url); |
+ } while (CurlPerformOnce()); |
} |
void LibcurlHttpFetcher::TerminateTransfer() { |
CleanUp(); |
} |
-// TODO(adlr): detect network failures |
-void LibcurlHttpFetcher::CurlPerformOnce() { |
+bool LibcurlHttpFetcher::CurlPerformOnce() { |
CHECK(transfer_in_progress_); |
int running_handles = 0; |
CURLMcode retcode = CURLM_CALL_MULTI_PERFORM; |
@@ -82,7 +90,8 @@ void LibcurlHttpFetcher::CurlPerformOnce() { |
CleanUp(); |
if ((transfer_size_ >= 0) && (bytes_downloaded_ < transfer_size_)) { |
- ResumeTransfer(url_); |
+ // Need to restart transfer |
+ return true; |
} else { |
if (delegate_) { |
delegate_->TransferComplete(this, true); // success |
@@ -92,6 +101,7 @@ void LibcurlHttpFetcher::CurlPerformOnce() { |
// set up callback |
SetupMainloopSources(); |
} |
+ return false; |
} |
size_t LibcurlHttpFetcher::LibcurlWrite(void *ptr, size_t size, size_t nmemb) { |
@@ -163,7 +173,6 @@ void LibcurlHttpFetcher::SetupMainloopSources() { |
// If we are already tracking this fd, continue. |
if (io_channels_.find(i) != io_channels_.end()) |
continue; |
- |
// We must track a new fd |
GIOChannel *io_channel = g_io_channel_unix_new(i); |
guint tag = g_io_add_watch( |
@@ -173,6 +182,11 @@ void LibcurlHttpFetcher::SetupMainloopSources() { |
&StaticFDCallback, |
this); |
io_channels_[i] = make_pair(io_channel, tag); |
+ static int io_counter = 0; |
+ io_counter++; |
+ if (io_counter % 50 == 0) { |
+ LOG(INFO) << "io_counter = " << io_counter; |
+ } |
} |
// Wet up a timeout callback for libcurl |
@@ -186,51 +200,46 @@ void LibcurlHttpFetcher::SetupMainloopSources() { |
// curl_multi_perform() again. |
ms = idle_ms_; |
} |
- if (timeout_source_) { |
- g_source_destroy(timeout_source_); |
- timeout_source_ = NULL; |
+ if (!timeout_source_) { |
+ LOG(INFO) << "setting up timeout source:" << ms; |
+ timeout_source_ = g_timeout_source_new(1000); |
+ CHECK(timeout_source_); |
+ g_source_set_callback(timeout_source_, StaticTimeoutCallback, this, |
+ NULL); |
+ g_source_attach(timeout_source_, NULL); |
+ static int counter = 0; |
+ counter++; |
+ if (counter % 50 == 0) { |
+ LOG(INFO) << "counter = " << counter; |
+ } |
} |
- timeout_source_ = g_timeout_source_new(ms); |
- CHECK(timeout_source_); |
- g_source_set_callback(timeout_source_, StaticTimeoutCallback, this, |
- NULL); |
- g_source_attach(timeout_source_, NULL); |
} |
bool LibcurlHttpFetcher::FDCallback(GIOChannel *source, |
GIOCondition condition) { |
- // Figure out which source it was; hopefully there aren't too many b/c |
- // this is a linear scan of our channels |
- bool found_in_set = false; |
- for (IOChannels::iterator it = io_channels_.begin(); |
- it != io_channels_.end(); ++it) { |
- if (it->second.first == source) { |
- // We will return false from this method, meaning that we shouldn't keep |
- // this g_io_channel around. So we remove it now from our collection of |
- // g_io_channels so that the other code in this class doens't mess with |
- // this (doomed) GIOChannel. |
- // TODO(adlr): optimize by seeing if we should reuse this GIOChannel |
- g_source_remove(it->second.second); |
- g_io_channel_unref(it->second.first); |
- io_channels_.erase(it); |
- found_in_set = true; |
- break; |
- } |
+ while (CurlPerformOnce()) { |
+ ResumeTransfer(url_); |
} |
- CHECK(found_in_set); |
- CurlPerformOnce(); |
- return false; |
+ // We handle removing of this source elsewhere, so we always return true. |
+ // The docs say, "the function should return FALSE if the event source |
+ // should be removed." |
+ // http://www.gtk.org/api/2.6/glib/glib-IO-Channels.html#GIOFunc |
+ return true; |
} |
-bool LibcurlHttpFetcher::TimeoutCallback() { |
+gboolean LibcurlHttpFetcher::TimeoutCallback() { |
+ if (!transfer_in_progress_) |
+ return TRUE; |
// Since we will return false from this function, which tells glib to |
// destroy the timeout callback, we must NULL it out here. This way, when |
// setting up callback sources again, we won't try to delete this (doomed) |
// timeout callback then. |
// TODO(adlr): optimize by checking if we can keep this timeout callback. |
- timeout_source_ = NULL; |
- CurlPerformOnce(); |
- return false; |
+ //timeout_source_ = NULL; |
+ while (CurlPerformOnce()) { |
+ ResumeTransfer(url_); |
+ } |
+ return TRUE; |
} |
void LibcurlHttpFetcher::CleanUp() { |