Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(4)

Side by Side Diff: golden/go/pdfxform/main.go

Issue 1216483002: golden/pdfxform a pdf rasterization server (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: 2015-07-09 (Thursday) 13:32:58 EDT Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « go/pdf/testdata/minimal.pdf ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // pdfxform is a server that rasterizes PDF documents into PNG
2 package main
3
4 import (
5 "bytes"
6 "crypto/md5"
7 "encoding/hex"
8 "encoding/json"
9 "flag"
10 "fmt"
11 "io"
12 "io/ioutil"
13 "net/http"
14 "os"
15 "os/user"
16 "path"
17 "path/filepath"
18 "strings"
19 "time"
20
21 "github.com/skia-dev/glog"
22 "go.skia.org/infra/go/auth"
23 "go.skia.org/infra/go/common"
24 "go.skia.org/infra/go/gs"
25 "go.skia.org/infra/go/pdf"
26 "go.skia.org/infra/go/util"
27 "go.skia.org/infra/perf/go/goldingester"
28 "google.golang.org/api/storage/v1"
29 )
30
31 ////////////////////////////////////////////////////////////////////////////////
32
33 const (
34 PNG_EXT = "png"
35 PDF_EXT = "pdf"
36 )
37
38 ////////////////////////////////////////////////////////////////////////////////
39
40 // md5OfFile calculates the MD5 checksum of a file.
41 func md5OfFile(path string) (string, error) {
42 md5 := md5.New()
43 f, err := os.Open(path)
44 if err != nil {
45 return "", err
46 }
47 defer util.Close(f)
48 if _, err = io.Copy(md5, f); err != nil {
49 return "", err
50 }
51 return hex.EncodeToString(md5.Sum(nil)), nil
52 }
53
54 // removeIfExists is like util.Remove, but logs no error if the file does not ex ist.
55 func removeIfExists(path string) {
56 if err := os.Remove(path); err != nil {
57 if !os.IsNotExist(err) {
58 glog.Errorf("Failed to Remove(%s): %v", path, err)
59 }
60 }
61 }
62
63 // isPDF returns true if the path appears to point to a PDF file.
64 func isPDF(path string) bool {
65 f, err := os.Open(path)
66 if err != nil {
67 return false
68 }
69 defer util.Close(f)
70 var buffer [4]byte
stephana 2015/07/09 17:55:45 buffer := make([]byte, 4) ... f.Read(buffer)....
hal.canary 2015/07/09 17:58:38 Done.
71 if n, err := f.Read(buffer[:]); n != 4 || err != nil {
72 return false
73 }
74 var magic = [4]byte{'%', 'P', 'D', 'F'}
75 return bytes.Equal(magic[:], buffer[:])
stephana 2015/07/09 17:55:45 return string(buffer) == "%PDF"
hal.canary 2015/07/09 17:58:38 Done.
76 }
77
78 // writeTo opens a file and dumps the contents of the reader into it.
79 func writeTo(path string, reader *io.ReadCloser) error {
80 defer util.Close(*reader)
81 file, err := os.Create(path)
82 if err == nil {
83 _, err = io.Copy(file, *reader)
84 }
85 return err
86 }
87
88 ////////////////////////////////////////////////////////////////////////////////
89
90 // storageClient struct is used for uploading to cloud storage
91 type storageClient struct {
92 httpClient *http.Client
93 storageService *storage.Service
94 }
95
96 // getClient returns an authorized storage.Service and the
97 // corresponding http.Client; if anything goes wrong, it logs a fatal
98 // error.
99 func getClient() (storageClient, error) {
100 var client *http.Client
101 var err error
102 if *local {
103 client, err = auth.RunFlow(auth.OAuthConfig(*oauthCacheFile, aut h.SCOPE_FULL_CONTROL))
104 // TODO(stephana): Replace auth.RunFlow with auth.NewClient
105 // client, err = auth.NewClient(true, *oauthCacheFile, auth.SCOP E_FULL_CONTROL, auth.SCOPE_GCE)
106 } else {
107 client = auth.GCEServiceAccountClient(&http.Transport{Dial: util .DialTimeout})
108 }
109 if err != nil {
110 return storageClient{}, err
111 }
112 gsService, err := storage.New(client)
113 if err != nil {
114 return storageClient{}, err
115 }
116 return storageClient{httpClient: client, storageService: gsService}, nil
117 }
118
119 // gsFetch fetch the object's data from google storage
120 func gsFetch(object *storage.Object, sc storageClient) (io.ReadCloser, int64, er ror) {
121 request, err := gs.RequestForStorageURL(object.MediaLink)
122 if err != nil {
123 return nil, -1, err
124 }
125 resp, err := sc.httpClient.Do(request)
126 if err != nil {
127 return nil, -1, err
128 }
129 if resp.StatusCode != 200 {
130 resp.Body.Close()
131 return nil, -1, fmt.Errorf("Failed to retrieve: %s %d %s", objec t.MediaLink, resp.StatusCode, resp.Status)
132 }
133 return resp.Body, resp.ContentLength, nil
134 }
135
136 // uploadFile uploads the specified file to the remote dir in Google
137 // Storage. It also sets the appropriate ACLs on the uploaded file.
138 // If the file already exists on the server, do nothing.
139 func uploadFile(sc storageClient, input io.Reader, storageBucket, storagePath, a ccessControlEntity string) (bool, error) {
140 obj, _ := sc.storageService.Objects.Get(storageBucket, storagePath).Do()
141 if obj != nil {
142 return false, nil // noclobber
143 }
144 fullPath := fmt.Sprintf("gs://%s/%s", storageBucket, storagePath)
145 object := &storage.Object{Name: storagePath}
146 if _, err := sc.storageService.Objects.Insert(storageBucket, object).Med ia(input).Do(); err != nil {
147 return false, fmt.Errorf("Objects.Insert(%s) failed: %s", fullPa th, err)
148 }
149 objectAcl := &storage.ObjectAccessControl{
150 Bucket: storageBucket, Entity: accessControlEntity, Object: stor agePath, Role: "READER",
151 }
152 if _, err := sc.storageService.ObjectAccessControls.Insert(storageBucket , storagePath, objectAcl).Do(); err != nil {
153 return false, fmt.Errorf("Could not update ACL of %s: %s", fullP ath, err)
154 }
155 return true, nil
156 }
157
158 ////////////////////////////////////////////////////////////////////////////////
159
160 var (
161 local = flag.Bool("local", false, "Set to true if not r unning in prod")
162 oauthCacheFile = flag.String("oauth_cache_file", "oauth_cache.da t", "Path to look for and store an OAuth token")
163 dataDir = flag.String("data_dir", "", "Directory to store data in.")
164 failureImage = flag.String("failure_image", "", "Location of a PNG image; must be set")
165 storageBucket = flag.String("storage_bucket", "chromium-skia-gm ", "The bucket for json, pdf, and png files")
166 storageJsonDirectory = flag.String("storage_json_directory", "dm-json- v1", "The directory on bucket for json files.")
167 storageImagesDirectory = flag.String("storage_images_directory", "dm-ima ges-v1", "The directory on bucket for png and pdf files.")
168 accessControlEntity = flag.String("access_control_entity", "domain-go ogle.com", "The entity that has permissions to manage the bucket")
169 graphiteServer = flag.String("graphite_server", "skia-monitoring :2003", "Where the Graphite metrics ingestion server is running")
170 )
171
172 // The pdfXformer struct holds state
173 type pdfXformer struct {
174 client storageClient
175 rasterizers []pdf.Rasterizer
176 results map[string]map[int]string
177 counter int
178 identifier string
179 errorImageMd5 string
180 }
181
182 // rasterizeOnce applies a single rastetizer to the given pdf file.
183 // If the rasterizer fails, use the errorImage. If everything
184 // succeeds, upload the PNG.
185 func (xformer *pdfXformer) rasterizeOnce(pdfPath string, rasterizerIndex int) (s tring, error) {
186 rasterizer := xformer.rasterizers[rasterizerIndex]
187 tempdir := filepath.Dir(pdfPath)
188 pngPath := path.Join(tempdir, fmt.Sprintf("%s.%s", rasterizer.String(), PNG_EXT))
189 defer removeIfExists(pngPath)
190 glog.Infof("> > > > rasterizing with %s", rasterizer)
191 err := rasterizer.Rasterize(pdfPath, pngPath)
192 if err != nil {
193 glog.Warningf("rasterizing %s with %s failed: %s", filepath.Base (pdfPath), rasterizer.String(), err)
194 return xformer.errorImageMd5, nil
195 }
196 md5, err := md5OfFile(pngPath)
197 if err != nil {
198 return "", err
199 }
200 f, err := os.Open(pngPath)
201 if err != nil {
202 return "", err
203 }
204 defer util.Close(f)
205 pngUploadPath := fmt.Sprintf("%s/%s.%s", *storageImagesDirectory, md5, P NG_EXT)
206 didUpload, err := uploadFile(xformer.client, f, *storageBucket, pngUploa dPath, *accessControlEntity)
207 if err != nil {
208 return "", err
209 }
210 if didUpload {
211 glog.Infof("> > > > uploaded %s", pngUploadPath)
212 }
213 return md5, nil
214 }
215
216 // makeTmpDir returns a nicely-named directory for temp files in $TMPDIR
217 func (xformer *pdfXformer) makeTmpDir() (string, error) {
218 if xformer.identifier == "" {
219 var host, userName string
220 if h, err := os.Hostname(); err == nil {
221 host = h
222 if i := strings.Index(host, "."); i >= 0 {
223 host = host[:i]
224 }
225 }
226 if currentUser, err := user.Current(); err == nil {
227 userName = currentUser.Username
228 }
229 userName = strings.Replace(userName, `\`, "_", -1)
230 xformer.identifier = fmt.Sprintf("%s.%s.%s.tmp.%d.", filepath.Ba se(os.Args[0]), host, userName, os.Getpid())
231 }
232 return ioutil.TempDir(*dataDir, xformer.identifier)
233 }
234
235 func newResult(key map[string]string, rasterizerName, digest string) *goldingest er.Result {
236 keyCopy := map[string]string{}
237 for k, v := range key {
238 keyCopy[k] = v
239 }
240 keyCopy["rasterizer"] = rasterizerName
241 options := map[string]string{"ext": PNG_EXT}
242 return &goldingester.Result{Key: keyCopy, Digest: digest, Options: optio ns}
243 }
244
245 // processResult rasterizes a single PDF result and returns a set of new results .
246 func (xformer *pdfXformer) processResult(res goldingester.Result) []*goldingeste r.Result {
247 rasterizedResults := []*goldingester.Result{}
248 resultMap, found := xformer.results[res.Digest]
249 if found {
250 // Skip rasterizion steps: big win.
251 for index, rasterizer := range xformer.rasterizers {
252 digest, ok := resultMap[index]
253 if ok {
254 rasterizedResults = append(rasterizedResults,
255 newResult(res.Key, rasterizer.String(), digest))
256 } else {
257 glog.Errorf("missing rasterizer %s on %s", raste rizer.String(), res.Digest)
258 }
259 }
260 return rasterizedResults
261 }
262
263 tempdir, err := xformer.makeTmpDir()
264 if err != nil {
265 glog.Errorf("error making temp directory: %s", err)
266 return rasterizedResults
267 }
268 defer util.RemoveAll(tempdir)
269 pdfPath := path.Join(tempdir, fmt.Sprintf("%s.pdf", res.Digest))
270 objectName := fmt.Sprintf("%s/%s.pdf", *storageImagesDirectory, res.Dige st)
271 storageURL := fmt.Sprintf("gs://%s/%s", *storageBucket, objectName)
272 object, err := xformer.client.storageService.Objects.Get(*storageBucket, objectName).Do()
273 if err != nil {
274 glog.Errorf("unable to find %s: %s", storageURL, err)
275 return []*goldingester.Result{}
276 }
277 pdfData, _, err := gsFetch(object, xformer.client)
278 if err != nil {
279 glog.Errorf("unable to retrieve %s: %s", storageURL, err)
280 return []*goldingester.Result{}
281 }
282 writeTo(pdfPath, &pdfData)
283 if !isPDF(pdfPath) {
284 glog.Errorf("%s is not a PDF", objectName)
285 return []*goldingester.Result{}
286 }
287 resultMap = map[int]string{}
288 for index, rasterizer := range xformer.rasterizers {
289 digest, err := xformer.rasterizeOnce(pdfPath, index)
290 if err != nil {
291 glog.Errorf("rasterizer %s failed on %s.pdf: %s", raster izer, res.Digest, err)
292 continue
293 }
294 rasterizedResults = append(rasterizedResults,
295 newResult(res.Key, rasterizer.String(), digest))
296 resultMap[index] = digest
297 }
298 xformer.results[res.Digest] = resultMap
299 return rasterizedResults
300 }
301
302 // processJsonFile reads a json file and produces a new json file
303 // with rasterized results.
304 func (xformer *pdfXformer) processJsonFile(jsonFileObject *storage.Object) {
305 jsonURL := fmt.Sprintf("gs://%s/%s", *storageBucket, jsonFileObject.Name )
306 if jsonFileObject.Metadata["rasterized"] == "true" {
307 glog.Infof("> > skipping %s (already processed) {%d}", jsonURL, xformer.counter)
308 return
309 }
310 body, length, err := gsFetch(jsonFileObject, xformer.client)
311 if err != nil {
312 glog.Errorf("Failed to fetch %s", jsonURL)
313 return
314 }
315 if 0 == length {
316 util.Close(body)
317 glog.Infof("> > skipping %s (empty file) {%d}", jsonURL, xformer .counter)
318 return
319 }
320 dmstruct := goldingester.DMResults{}
321 err = json.NewDecoder(body).Decode(&dmstruct)
322 util.Close(body)
323 if err != nil {
324 glog.Errorf("Failed to parse %s", jsonURL)
325 return
326 }
327 countPdfResults := 0
328 for _, res := range dmstruct.Results {
329 if res.Options["ext"] == PDF_EXT {
330 countPdfResults++
331 }
332 }
333 if 0 == countPdfResults {
334 glog.Infof("> > 0 PDFs found %s {%d}", jsonURL, xformer.counter)
335 xformer.setRasterized(jsonFileObject)
336 return
337 }
338
339 glog.Infof("> > processing %d pdfs of %d results {%d}", countPdfResults, len(dmstruct.Results), xformer.counter)
340 rasterizedResults := []*goldingester.Result{}
341 i := 0
342 for _, res := range dmstruct.Results {
343 if res.Options["ext"] == PDF_EXT {
344 i++
345 glog.Infof("> > > processing %s.pdf [%d/%d] {%d}", res.D igest, i, countPdfResults, xformer.counter)
346 rasterizedResults = append(rasterizedResults, xformer.pr ocessResult(*res)...)
347 }
348 }
349 newDMStruct := goldingester.DMResults{
350 BuildNumber: dmstruct.BuildNumber,
351 GitHash: dmstruct.GitHash,
352 Key: dmstruct.Key,
353 Results: rasterizedResults,
354 }
355 newJson, err := json.Marshal(newDMStruct)
356 if err != nil {
357 glog.Errorf("Unexpected json.Marshal error: %s", err)
358 return
359 }
360
361 now := time.Now()
362 // Change the date; leave most of the rest of the path components.
363 jsonPathComponents := strings.Split(jsonFileObject.Name, "/") // []strin g
364 if len(jsonPathComponents) < 4 {
365 fmt.Errorf("unexpected number of path components %q", jsonPathCo mponents)
366 return
367 }
368 jsonPathComponents = jsonPathComponents[len(jsonPathComponents)-4:]
369 jsonPathComponents[1] += "-pdfxformer"
370 jsonUploadPath := fmt.Sprintf("%s/%d/%02d/%02d/%02d/%s",
371 *storageJsonDirectory,
372 now.Year(),
373 int(now.Month()),
374 now.Day(),
375 now.Hour(),
376 strings.Join(jsonPathComponents, "/"))
377
378 _, err = uploadFile(xformer.client, bytes.NewReader(newJson), *storageBu cket, jsonUploadPath, *accessControlEntity)
379 glog.Infof("> > wrote gs://%s/%s", *storageBucket, jsonUploadPath)
380 newJsonFileObject, err := xformer.client.storageService.Objects.Get(*sto rageBucket, jsonUploadPath).Do()
381 if err != nil {
382 glog.Errorf("Failed to find %s: %s", jsonUploadPath, err)
383 } else {
384 xformer.setRasterized(newJsonFileObject)
385 }
386 xformer.setRasterized(jsonFileObject)
387 }
388
389 // setRasterized sets the rasterized metadata flag of the given storage.Object
390 func (xformer *pdfXformer) setRasterized(jsonFileObject *storage.Object) {
391 if nil == jsonFileObject.Metadata {
392 jsonFileObject.Metadata = map[string]string{}
393 }
394 jsonFileObject.Metadata["rasterized"] = "true"
395 _, err := xformer.client.storageService.Objects.Patch(*storageBucket, js onFileObject.Name, jsonFileObject).Do()
396 if err != nil {
397 glog.Errorf("Failed to update metadata of %s: %s", jsonFileObjec t.Name, err)
398 } else {
399 glog.Infof("> > Updated metadata of %s", jsonFileObject.Name)
400 }
401 }
402
403 // processTimeRange calls gs.GetLatestGSDirs to get a list of
404 func (xformer *pdfXformer) processTimeRange(start time.Time, end time.Time) {
405 glog.Infof("Processing time range: (%s, %s)", start.Truncate(time.Second ), end.Truncate(time.Second))
406 for _, dir := range gs.GetLatestGSDirs(start.Unix(), end.Unix(), *storag eJsonDirectory) {
407 glog.Infof("> Reading gs://%s/%s\n", *storageBucket, dir)
408 requestedObjects := xformer.client.storageService.Objects.List(* storageBucket).Prefix(dir).Fields(
409 "nextPageToken", "items/updated", "items/md5Hash", "item s/mediaLink", "items/name", "items/metadata")
410 for requestedObjects != nil {
411 responseObjects, err := requestedObjects.Do()
412 if err != nil {
413 glog.Errorf("request %#v failed: %s", requestedO bjects, err)
414 } else {
415 for _, jsonObject := range responseObjects.Items {
416 xformer.counter++
417 glog.Infof("> > Processing object: gs:/ /%s/%s {%d}", *storageBucket, jsonObject.Name, xformer.counter)
418 xformer.processJsonFile(jsonObject)
419 }
420 }
421 if len(responseObjects.NextPageToken) > 0 {
422 requestedObjects.PageToken(responseObjects.NextP ageToken)
423 } else {
424 requestedObjects = nil
425 }
426 }
427 }
428 glog.Infof("finished time range.")
429 }
430
431 // uploadErrorImage should be run once to verify that the image is there
432 func (xformer *pdfXformer) uploadErrorImage(path string) error {
433 if "" == path {
434 glog.Fatalf("Missing --path argument")
435 }
436 errorImageMd5, err := md5OfFile(path)
437 if err != nil {
438 glog.Fatalf("Bad --path argument")
439 }
440 errorImageFileReader, err := os.Open(path)
441 if err != nil {
442 return err
443 }
444 defer util.Close(errorImageFileReader)
445 errorImagePath := fmt.Sprintf("%s/%s.png", *storageImagesDirectory, erro rImageMd5)
446 _, err = uploadFile(xformer.client, errorImageFileReader, *storageBucket , errorImagePath, *accessControlEntity)
447 if err != nil {
448 return err
449 }
450 xformer.errorImageMd5 = errorImageMd5
451 return nil
452 }
453
454 func main() {
455 flag.Parse()
456 common.InitWithMetrics("pdfxform", graphiteServer)
457
458 client, err := getClient()
459 if err != nil {
460 glog.Fatal(err)
461 }
462 xformer := pdfXformer{
463 client: client,
464 results: map[string]map[int]string{},
465 }
466
467 err = xformer.uploadErrorImage(*failureImage)
468 if err != nil {
469 // If we can't upload this, we can't upload anything.
470 glog.Fatalf("Filed to upload error image: %s", err)
471 }
472
473 for _, rasterizer := range []pdf.Rasterizer{pdf.Pdfium{}, pdf.Poppler{}} {
474 if rasterizer.Enabled() {
475 xformer.rasterizers = append(xformer.rasterizers, raster izer)
476 } else {
477 glog.Infof("rasterizer %s is disabled", rasterizer.Strin g())
478 }
479 }
480 if len(xformer.rasterizers) == 0 {
481 glog.Fatalf("no rasterizers found")
482 }
483
484 end := time.Now()
485 start := end.Add(-172 * time.Hour)
486 xformer.processTimeRange(start, end)
487 glog.Flush() // Flush before waiting for next tick; it may be a while.
488 for _ = range time.Tick(time.Minute) {
489 start, end = end, time.Now()
490 xformer.processTimeRange(start, end)
491 glog.Flush()
492 }
493 }
OLDNEW
« no previous file with comments | « go/pdf/testdata/minimal.pdf ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698