Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(28)

Side by Side Diff: net/tools/ct_mapper/mapper.cc

Issue 1238413004: Framework for iterating over certificates in CT database from Chromium code. (not for review) (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Make samples page work Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/tools/ct_mapper/mapper.h ('k') | net/tools/ct_mapper/metrics.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/tools/ct_mapper/mapper.h"
6
7 #include <deque>
8 #include <iostream>
9
10 #include "base/bind.h"
11 #include "base/files/file_path.h"
12 #include "base/format_macros.h"
13 #include "base/stl_util.h"
14 #include "base/strings/stringprintf.h"
15 #include "base/synchronization/condition_variable.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h"
18 #include "base/time/time.h"
19 #include "net/der/input.h"
20 #include "net/tools/ct_mapper/entry.h"
21 #include "net/tools/ct_mapper/entry_reader.h"
22 #include "net/tools/ct_mapper/metrics.h"
23 #include "net/tools/ct_mapper/visitor.h"
24
25 namespace net {
26
27 namespace {
28
29 // After how many milliseconds of activity to print progress update for the map.
30 const int kProgressTrackerUpdatePeriodMs = 1000;
31
32 class ProgressTracker {
33 public:
34 ProgressTracker()
35 : start_time_(base::TimeTicks::Now()),
36 last_logged_(base::TimeTicks::Now()),
37 logging_interval_(
38 base::TimeDelta::FromMilliseconds(kProgressTrackerUpdatePeriodMs)) {
39 }
40
41 base::TimeDelta GetElapsedTime(const base::TimeTicks& now) const {
42 return now - start_time_;
43 }
44
45 void LogProgress(const base::TimeTicks& now,
46 size_t num_entries,
47 double progress) {
48 last_logged_ = base::TimeTicks::Now();
49
50 double elapsed_sec = GetElapsedTime(now).InSecondsF();
51
52 std::string progress_str = base::StringPrintf("%.3lf", 100.0 * progress);
53
54 std::cerr << "\nMap progress (" << progress_str << "%)\n"
55 << " entries processed: " << num_entries << "\n"
56 << " elapsed time: "
57 << base::StringPrintf("%.3lf", elapsed_sec) << " seconds\n";
58 }
59
60 bool ShouldLogProgress(const base::TimeTicks& now) const {
61 return (now - last_logged_) >= logging_interval_;
62 }
63
64 private:
65 base::TimeTicks start_time_;
66 base::TimeTicks last_logged_;
67 base::TimeDelta logging_interval_;
68
69 DISALLOW_COPY_AND_ASSIGN(ProgressTracker);
70 };
71
72 using Entries = std::vector<Entry>;
73
74 class WorkQueue {
75 public:
76 WorkQueue(size_t max_chunks)
77 : reader_cv_(&lock_), writer_cv_(&lock_), max_chunks_(max_chunks) {}
78
79 void AddChunk(const Entries& entries);
80 bool RemoveChunk(Entries* entries);
81
82 void Complete();
83
84 private:
85 bool CanAddChunk() { return chunks_.size() < max_chunks_; }
86
87 bool CanRemoveChunk() { return complete_ || !chunks_.empty(); }
88
89 base::Lock lock_;
90 base::ConditionVariable reader_cv_;
91 base::ConditionVariable writer_cv_;
92
93 bool complete_ = false;
94
95 std::deque<Entries> chunks_;
96 const size_t max_chunks_;
97 };
98
99 void WorkQueue::AddChunk(const Entries& entries) {
100 base::AutoLock l(lock_);
101
102 DCHECK(!complete_);
103
104 while (!CanAddChunk())
105 writer_cv_.Wait();
106
107 chunks_.push_back(entries);
108 reader_cv_.Signal();
109 }
110
111 bool WorkQueue::RemoveChunk(Entries* entries) {
112 base::AutoLock l(lock_);
113
114 while (!CanRemoveChunk())
115 reader_cv_.Wait();
116
117 if (chunks_.empty()) {
118 DCHECK(complete_);
119 return false;
120 }
121
122 bool writer_is_blocked = !CanAddChunk();
123
124 *entries = std::move(chunks_.front());
125 chunks_.pop_front();
126
127 if (writer_is_blocked && CanAddChunk())
128 writer_cv_.Signal();
129
130 return true;
131 }
132
133 void WorkQueue::Complete() {
134 base::AutoLock l(lock_);
135 complete_ = true;
136 reader_cv_.Broadcast();
137 }
138
139 class Results {
140 public:
141 void Merge(const Metrics& metrics) {
142 base::AutoLock l(lock_);
143 metrics_.Merge(metrics);
144 }
145
146 void CopyTo(Metrics* metrics) {
147 base::AutoLock l(lock_);
148 *metrics = metrics_;
149 }
150
151 private:
152 base::Lock lock_;
153 Metrics metrics_;
154 };
155
156 void WorkerMain(WorkQueue* queue,
157 Results* final_results,
158 VisitorFactory* visitor_factory) {
159 std::unique_ptr<Visitor> visitor = visitor_factory->Create();
160
161 visitor->Start();
162
163 Metrics local_metrics;
164
165 Entries chunk;
166 while (queue->RemoveChunk(&chunk)) {
167 for (const auto& entry : chunk) {
168 visitor->Visit(entry, &local_metrics);
169 }
170 }
171
172 final_results->Merge(local_metrics);
173 }
174
175 std::vector<std::unique_ptr<base::Thread>> StartWorkerThreads(
176 const MapperOptions& options,
177 WorkQueue* queue,
178 Results* results,
179 VisitorFactory* visitor_factory) {
180 std::vector<std::unique_ptr<base::Thread>> threads;
181 threads.reserve(options.num_threads);
182
183 // Initialize the worker threads.
184 for (size_t i = 0; i < options.num_threads; ++i) {
185 std::unique_ptr<base::Thread> thread = base::MakeUnique<base::Thread>(
186 base::StringPrintf("Ct Mapper %" PRIuS, i));
187
188 base::Thread::Options options(base::MessageLoop::TYPE_IO, 0);
189 CHECK(thread->StartWithOptions(options));
190
191 thread->task_runner()->PostTask(
192 FROM_HERE, base::Bind(&WorkerMain, base::Unretained(queue),
193 base::Unretained(results),
194 base::Unretained(visitor_factory)));
195
196 threads.push_back(std::move(thread));
197 }
198
199 return threads;
200 }
201
202 } // namespace
203
204 MapperOptions::MapperOptions()
205 : num_samples_per_bucket(20),
206 num_threads(16),
207 chunk_size(512),
208 max_pending_chunks(32) {}
209
210 size_t ForEachEntry(EntryReader* reader,
211 VisitorFactory* visitor_factory,
212 const MapperOptions& options,
213 Metrics* metrics) {
214 BucketValue::SetMaxSamples(options.num_samples_per_bucket);
215
216 Results results;
217 WorkQueue queue(options.max_pending_chunks);
218
219 std::vector<std::unique_ptr<base::Thread>> worker_threads =
220 StartWorkerThreads(options, &queue, &results, visitor_factory);
221
222 ProgressTracker progress;
223
224 size_t total_entries_read = 0;
225
226 Entries chunk;
227 while (reader->Read(&chunk, options.chunk_size)) {
228 total_entries_read += chunk.size();
229 queue.AddChunk(chunk);
230
231 base::TimeTicks now = base::TimeTicks::Now();
232 if (progress.ShouldLogProgress(now)) {
233 progress.LogProgress(now, total_entries_read, reader->GetProgress());
234 }
235
236 if (!options.max_elapsed_time.is_zero() &&
237 (progress.GetElapsedTime(now) >= options.max_elapsed_time)) {
238 break;
239 }
240 }
241
242 queue.Complete();
243
244 // Wait for the workers to finish.
245 worker_threads.clear();
246
247 std::cerr << "\nDONE\n";
248 progress.LogProgress(base::TimeTicks::Now(), total_entries_read, 1);
249 std::cerr << "\n";
250
251 results.CopyTo(metrics);
252 metrics->Finalize();
253 return total_entries_read;
254 }
255
256 } // namespace net
OLDNEW
« no previous file with comments | « net/tools/ct_mapper/mapper.h ('k') | net/tools/ct_mapper/metrics.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698