Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(173)

Side by Side Diff: net/reporting/reporting_delivery_agent.cc

Issue 2785293003: Reporting: Make DeliveryAgent self-scheduling. (Closed)
Patch Set: rebase Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The Chromium Authors. All rights reserved. 1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/reporting/reporting_delivery_agent.h" 5 #include "net/reporting/reporting_delivery_agent.h"
6 6
7 #include <map> 7 #include <map>
8 #include <string> 8 #include <string>
9 #include <vector> 9 #include <vector>
10 10
11 #include "base/bind.h" 11 #include "base/bind.h"
12 #include "base/json/json_writer.h" 12 #include "base/json/json_writer.h"
13 #include "base/logging.h" 13 #include "base/logging.h"
14 #include "base/memory/ptr_util.h" 14 #include "base/memory/ptr_util.h"
15 #include "base/time/tick_clock.h" 15 #include "base/time/tick_clock.h"
16 #include "base/timer/timer.h"
16 #include "base/values.h" 17 #include "base/values.h"
17 #include "net/reporting/reporting_cache.h" 18 #include "net/reporting/reporting_cache.h"
18 #include "net/reporting/reporting_endpoint_manager.h" 19 #include "net/reporting/reporting_endpoint_manager.h"
20 #include "net/reporting/reporting_observer.h"
19 #include "net/reporting/reporting_report.h" 21 #include "net/reporting/reporting_report.h"
20 #include "net/reporting/reporting_uploader.h" 22 #include "net/reporting/reporting_uploader.h"
21 #include "url/gurl.h" 23 #include "url/gurl.h"
22 #include "url/origin.h" 24 #include "url/origin.h"
23 25
24 namespace net { 26 namespace net {
25 27
26 namespace { 28 namespace {
27 29
28 void SerializeReports(const std::vector<const ReportingReport*>& reports, 30 void SerializeReports(const std::vector<const ReportingReport*>& reports,
(...skipping 10 matching lines...) Expand all
39 report_value->SetString("url", report->url.spec()); 41 report_value->SetString("url", report->url.spec());
40 report_value->Set("report", report->body->DeepCopy()); 42 report_value->Set("report", report->body->DeepCopy());
41 43
42 reports_value.Append(std::move(report_value)); 44 reports_value.Append(std::move(report_value));
43 } 45 }
44 46
45 bool json_written = base::JSONWriter::Write(reports_value, json_out); 47 bool json_written = base::JSONWriter::Write(reports_value, json_out);
46 DCHECK(json_written); 48 DCHECK(json_written);
47 } 49 }
48 50
51 class ReportingDeliveryAgentImpl : public ReportingDeliveryAgent,
52 public ReportingObserver {
53 public:
54 ReportingDeliveryAgentImpl(ReportingContext* context)
55 : context_(context),
56 timer_(base::MakeUnique<base::OneShotTimer>()),
57 weak_factory_(this) {
58 context_->AddObserver(this);
59 }
60
61 // ReportingDeliveryAgent implementation:
62
63 ~ReportingDeliveryAgentImpl() override { context_->RemoveObserver(this); }
64
65 void Initialize() override {
66 if (CacheHasReports())
67 StartTimer();
68 }
69
70 void SetTimerForTesting(std::unique_ptr<base::Timer> timer) override {
71 DCHECK(!timer_->IsRunning());
72 timer_ = std::move(timer);
73 }
74
75 // ReportingObserver implementation:
76 void OnCacheUpdated() override {
77 if (CacheHasReports())
78 StartTimer();
79 }
80
81 private:
82 class Delivery {
83 public:
84 Delivery(const GURL& endpoint,
85 const std::vector<const ReportingReport*>& reports)
86 : endpoint(endpoint), reports(reports) {}
87
88 ~Delivery() {}
89
90 const GURL endpoint;
91 const std::vector<const ReportingReport*> reports;
92 };
93
94 using OriginGroup = std::pair<url::Origin, std::string>;
95
96 bool CacheHasReports() {
97 std::vector<const ReportingReport*> reports;
98 context_->cache()->GetReports(&reports);
99 return !reports.empty();
100 }
101
102 void StartTimer() {
103 timer_->Start(FROM_HERE, policy().delivery_interval,
104 base::Bind(&ReportingDeliveryAgentImpl::OnTimerFired,
105 base::Unretained(this)));
106 }
107
108 void OnTimerFired() {
109 if (CacheHasReports()) {
110 SendReports();
111 StartTimer();
112 }
113 }
114
115 void SendReports() {
116 std::vector<const ReportingReport*> reports;
117 cache()->GetReports(&reports);
118
119 // Sort reports into (origin, group) buckets.
120 std::map<OriginGroup, std::vector<const ReportingReport*>>
121 origin_group_reports;
122 for (const ReportingReport* report : reports) {
123 OriginGroup origin_group(url::Origin(report->url), report->group);
124 origin_group_reports[origin_group].push_back(report);
125 }
126
127 // Find endpoint for each (origin, group) bucket and sort reports into
128 // endpoint buckets. Don't allow concurrent deliveries to the same (origin,
129 // group) bucket.
130 std::map<GURL, std::vector<const ReportingReport*>> endpoint_reports;
131 for (auto& it : origin_group_reports) {
132 const OriginGroup& origin_group = it.first;
133
134 if (base::ContainsKey(pending_origin_groups_, origin_group))
135 continue;
136
137 GURL endpoint_url;
138 if (!endpoint_manager()->FindEndpointForOriginAndGroup(
139 origin_group.first, origin_group.second, &endpoint_url)) {
140 continue;
141 }
142
143 endpoint_reports[endpoint_url].insert(
144 endpoint_reports[endpoint_url].end(), it.second.begin(),
145 it.second.end());
146 pending_origin_groups_.insert(origin_group);
147 }
148
149 // Start a delivery to each endpoint.
150 for (auto& it : endpoint_reports) {
151 const GURL& endpoint = it.first;
152 const std::vector<const ReportingReport*>& reports = it.second;
153
154 endpoint_manager()->SetEndpointPending(endpoint);
155 cache()->SetReportsPending(reports);
156
157 std::string json;
158 SerializeReports(reports, tick_clock()->NowTicks(), &json);
159
160 uploader()->StartUpload(
161 endpoint, json,
162 base::Bind(&ReportingDeliveryAgentImpl::OnUploadComplete,
163 weak_factory_.GetWeakPtr(),
164 base::MakeUnique<Delivery>(endpoint, reports)));
165 }
166 }
167
168 void OnUploadComplete(const std::unique_ptr<Delivery>& delivery,
169 ReportingUploader::Outcome outcome) {
170 if (outcome == ReportingUploader::Outcome::SUCCESS) {
171 cache()->RemoveReports(delivery->reports);
172 endpoint_manager()->InformOfEndpointRequest(delivery->endpoint, true);
173 } else {
174 cache()->IncrementReportsAttempts(delivery->reports);
175 endpoint_manager()->InformOfEndpointRequest(delivery->endpoint, false);
176 }
177
178 if (outcome == ReportingUploader::Outcome::REMOVE_ENDPOINT)
179 cache()->RemoveClientsForEndpoint(delivery->endpoint);
180
181 for (const ReportingReport* report : delivery->reports) {
182 pending_origin_groups_.erase(
183 OriginGroup(url::Origin(report->url), report->group));
184 }
185
186 endpoint_manager()->ClearEndpointPending(delivery->endpoint);
187 cache()->ClearReportsPending(delivery->reports);
188 }
189
190 const ReportingPolicy& policy() { return context_->policy(); }
191 base::TickClock* tick_clock() { return context_->tick_clock(); }
192 ReportingCache* cache() { return context_->cache(); }
193 ReportingUploader* uploader() { return context_->uploader(); }
194 ReportingEndpointManager* endpoint_manager() {
195 return context_->endpoint_manager();
196 }
197
198 ReportingContext* context_;
199
200 std::unique_ptr<base::Timer> timer_;
201
202 // Tracks OriginGroup tuples for which there is a pending delivery running.
203 // (Would be an unordered_set, but there's no hash on pair.)
204 std::set<OriginGroup> pending_origin_groups_;
205
206 base::WeakPtrFactory<ReportingDeliveryAgentImpl> weak_factory_;
207
208 DISALLOW_COPY_AND_ASSIGN(ReportingDeliveryAgentImpl);
209 };
210
49 } // namespace 211 } // namespace
50 212
51 ReportingDeliveryAgent::ReportingDeliveryAgent(ReportingContext* context) 213 // static
52 : context_(context), weak_factory_(this) {} 214 std::unique_ptr<ReportingDeliveryAgent> ReportingDeliveryAgent::Create(
215 ReportingContext* context) {
216 return base::MakeUnique<ReportingDeliveryAgentImpl>(context);
217 }
218
53 ReportingDeliveryAgent::~ReportingDeliveryAgent() {} 219 ReportingDeliveryAgent::~ReportingDeliveryAgent() {}
54 220
55 class ReportingDeliveryAgent::Delivery {
56 public:
57 Delivery(const GURL& endpoint,
58 const std::vector<const ReportingReport*>& reports)
59 : endpoint(endpoint), reports(reports) {}
60
61 ~Delivery() {}
62
63 const GURL endpoint;
64 const std::vector<const ReportingReport*> reports;
65 };
66
67 void ReportingDeliveryAgent::SendReports() {
68 std::vector<const ReportingReport*> reports;
69 cache()->GetReports(&reports);
70
71 // Sort reports into (origin, group) buckets.
72 std::map<OriginGroup, std::vector<const ReportingReport*>>
73 origin_group_reports;
74 for (const ReportingReport* report : reports) {
75 OriginGroup origin_group(url::Origin(report->url), report->group);
76 origin_group_reports[origin_group].push_back(report);
77 }
78
79 // Find endpoint for each (origin, group) bucket and sort reports into
80 // endpoint buckets. Don't allow concurrent deliveries to the same (origin,
81 // group) bucket.
82 std::map<GURL, std::vector<const ReportingReport*>> endpoint_reports;
83 for (auto& it : origin_group_reports) {
84 const OriginGroup& origin_group = it.first;
85
86 if (base::ContainsKey(pending_origin_groups_, origin_group))
87 continue;
88
89 GURL endpoint_url;
90 if (!endpoint_manager()->FindEndpointForOriginAndGroup(
91 origin_group.first, origin_group.second, &endpoint_url)) {
92 continue;
93 }
94
95 endpoint_reports[endpoint_url].insert(endpoint_reports[endpoint_url].end(),
96 it.second.begin(), it.second.end());
97 pending_origin_groups_.insert(origin_group);
98 }
99
100 // Start a delivery to each endpoint.
101 for (auto& it : endpoint_reports) {
102 const GURL& endpoint = it.first;
103 const std::vector<const ReportingReport*>& reports = it.second;
104
105 endpoint_manager()->SetEndpointPending(endpoint);
106 cache()->SetReportsPending(reports);
107
108 std::string json;
109 SerializeReports(reports, tick_clock()->NowTicks(), &json);
110
111 uploader()->StartUpload(
112 endpoint, json,
113 base::Bind(&ReportingDeliveryAgent::OnUploadComplete,
114 weak_factory_.GetWeakPtr(),
115 base::MakeUnique<Delivery>(endpoint, reports)));
116 }
117 }
118
119 void ReportingDeliveryAgent::OnUploadComplete(
120 const std::unique_ptr<Delivery>& delivery,
121 ReportingUploader::Outcome outcome) {
122 if (outcome == ReportingUploader::Outcome::SUCCESS) {
123 cache()->RemoveReports(delivery->reports);
124 endpoint_manager()->InformOfEndpointRequest(delivery->endpoint, true);
125 } else {
126 cache()->IncrementReportsAttempts(delivery->reports);
127 endpoint_manager()->InformOfEndpointRequest(delivery->endpoint, false);
128 }
129
130 if (outcome == ReportingUploader::Outcome::REMOVE_ENDPOINT)
131 cache()->RemoveClientsForEndpoint(delivery->endpoint);
132
133 for (const ReportingReport* report : delivery->reports) {
134 pending_origin_groups_.erase(
135 OriginGroup(url::Origin(report->url), report->group));
136 }
137
138 endpoint_manager()->ClearEndpointPending(delivery->endpoint);
139 cache()->ClearReportsPending(delivery->reports);
140 }
141
142 } // namespace net 221 } // namespace net
OLDNEW
« no previous file with comments | « net/reporting/reporting_delivery_agent.h ('k') | net/reporting/reporting_delivery_agent_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698