OLD | NEW |
(Empty) | |
| 1 // pdfxform is a server that rasterizes PDF documents into PNG |
| 2 package main |
| 3 |
| 4 import ( |
| 5 "bytes" |
| 6 "crypto/md5" |
| 7 "encoding/hex" |
| 8 "encoding/json" |
| 9 "flag" |
| 10 "fmt" |
| 11 "io" |
| 12 "io/ioutil" |
| 13 "net/http" |
| 14 "os" |
| 15 "os/user" |
| 16 "path" |
| 17 "path/filepath" |
| 18 "strings" |
| 19 "time" |
| 20 |
| 21 "github.com/skia-dev/glog" |
| 22 "go.skia.org/infra/go/auth" |
| 23 "go.skia.org/infra/go/common" |
| 24 "go.skia.org/infra/go/gs" |
| 25 "go.skia.org/infra/go/pdf" |
| 26 "go.skia.org/infra/go/util" |
| 27 "go.skia.org/infra/perf/go/goldingester" |
| 28 "google.golang.org/api/storage/v1" |
| 29 ) |
| 30 |
| 31 //////////////////////////////////////////////////////////////////////////////// |
| 32 |
| 33 const ( |
| 34 PNG_EXT = "png" |
| 35 PDF_EXT = "pdf" |
| 36 ) |
| 37 |
| 38 //////////////////////////////////////////////////////////////////////////////// |
| 39 |
| 40 // md5OfFile calculates the MD5 checksum of a file. |
| 41 func md5OfFile(path string) (string, error) { |
| 42 md5 := md5.New() |
| 43 f, err := os.Open(path) |
| 44 if err != nil { |
| 45 return "", err |
| 46 } |
| 47 defer util.Close(f) |
| 48 if _, err = io.Copy(md5, f); err != nil { |
| 49 return "", err |
| 50 } |
| 51 return hex.EncodeToString(md5.Sum(nil)), nil |
| 52 } |
| 53 |
| 54 // removeIfExists is like util.Remove, but logs no error if the file does not ex
ist. |
| 55 func removeIfExists(path string) { |
| 56 if err := os.Remove(path); err != nil { |
| 57 if !os.IsNotExist(err) { |
| 58 glog.Errorf("Failed to Remove(%s): %v", path, err) |
| 59 } |
| 60 } |
| 61 } |
| 62 |
| 63 // isPDF returns true if the path appears to point to a PDF file. |
| 64 func isPDF(path string) bool { |
| 65 f, err := os.Open(path) |
| 66 if err != nil { |
| 67 return false |
| 68 } |
| 69 defer util.Close(f) |
| 70 buffer := make([]byte, 4) |
| 71 if n, err := f.Read(buffer); n != 4 || err != nil { |
| 72 return false |
| 73 } |
| 74 return string(buffer) == "%PDF" |
| 75 } |
| 76 |
| 77 // writeTo opens a file and dumps the contents of the reader into it. |
| 78 func writeTo(path string, reader *io.ReadCloser) error { |
| 79 defer util.Close(*reader) |
| 80 file, err := os.Create(path) |
| 81 if err == nil { |
| 82 _, err = io.Copy(file, *reader) |
| 83 } |
| 84 return err |
| 85 } |
| 86 |
| 87 //////////////////////////////////////////////////////////////////////////////// |
| 88 |
| 89 // storageClient struct is used for uploading to cloud storage |
| 90 type storageClient struct { |
| 91 httpClient *http.Client |
| 92 storageService *storage.Service |
| 93 } |
| 94 |
| 95 // getClient returns an authorized storage.Service and the |
| 96 // corresponding http.Client; if anything goes wrong, it logs a fatal |
| 97 // error. |
| 98 func getClient() (storageClient, error) { |
| 99 var client *http.Client |
| 100 var err error |
| 101 if *local { |
| 102 client, err = auth.RunFlow(auth.OAuthConfig(*oauthCacheFile, aut
h.SCOPE_FULL_CONTROL)) |
| 103 // TODO(stephana): Replace auth.RunFlow with auth.NewClient |
| 104 // client, err = auth.NewClient(true, *oauthCacheFile, auth.SCOP
E_FULL_CONTROL, auth.SCOPE_GCE) |
| 105 } else { |
| 106 client = auth.GCEServiceAccountClient(&http.Transport{Dial: util
.DialTimeout}) |
| 107 } |
| 108 if err != nil { |
| 109 return storageClient{}, err |
| 110 } |
| 111 gsService, err := storage.New(client) |
| 112 if err != nil { |
| 113 return storageClient{}, err |
| 114 } |
| 115 return storageClient{httpClient: client, storageService: gsService}, nil |
| 116 } |
| 117 |
| 118 // gsFetch fetch the object's data from google storage |
| 119 func gsFetch(object *storage.Object, sc storageClient) (io.ReadCloser, int64, er
ror) { |
| 120 request, err := gs.RequestForStorageURL(object.MediaLink) |
| 121 if err != nil { |
| 122 return nil, -1, err |
| 123 } |
| 124 resp, err := sc.httpClient.Do(request) |
| 125 if err != nil { |
| 126 return nil, -1, err |
| 127 } |
| 128 if resp.StatusCode != 200 { |
| 129 _ = resp.Body.Close() |
| 130 return nil, -1, fmt.Errorf("Failed to retrieve: %s %d %s", objec
t.MediaLink, resp.StatusCode, resp.Status) |
| 131 } |
| 132 return resp.Body, resp.ContentLength, nil |
| 133 } |
| 134 |
| 135 // uploadFile uploads the specified file to the remote dir in Google |
| 136 // Storage. It also sets the appropriate ACLs on the uploaded file. |
| 137 // If the file already exists on the server, do nothing. |
| 138 func uploadFile(sc storageClient, input io.Reader, storageBucket, storagePath, a
ccessControlEntity string) (bool, error) { |
| 139 obj, _ := sc.storageService.Objects.Get(storageBucket, storagePath).Do() |
| 140 if obj != nil { |
| 141 return false, nil // noclobber |
| 142 } |
| 143 fullPath := fmt.Sprintf("gs://%s/%s", storageBucket, storagePath) |
| 144 object := &storage.Object{Name: storagePath} |
| 145 if _, err := sc.storageService.Objects.Insert(storageBucket, object).Med
ia(input).Do(); err != nil { |
| 146 return false, fmt.Errorf("Objects.Insert(%s) failed: %s", fullPa
th, err) |
| 147 } |
| 148 objectAcl := &storage.ObjectAccessControl{ |
| 149 Bucket: storageBucket, Entity: accessControlEntity, Object: stor
agePath, Role: "READER", |
| 150 } |
| 151 if _, err := sc.storageService.ObjectAccessControls.Insert(storageBucket
, storagePath, objectAcl).Do(); err != nil { |
| 152 return false, fmt.Errorf("Could not update ACL of %s: %s", fullP
ath, err) |
| 153 } |
| 154 return true, nil |
| 155 } |
| 156 |
| 157 //////////////////////////////////////////////////////////////////////////////// |
| 158 |
| 159 var ( |
| 160 local = flag.Bool("local", false, "Set to true if not r
unning in prod") |
| 161 oauthCacheFile = flag.String("oauth_cache_file", "oauth_cache.da
t", "Path to look for and store an OAuth token") |
| 162 dataDir = flag.String("data_dir", "", "Directory to store
data in.") |
| 163 failureImage = flag.String("failure_image", "", "Location of a
PNG image; must be set") |
| 164 storageBucket = flag.String("storage_bucket", "chromium-skia-gm
", "The bucket for json, pdf, and png files") |
| 165 storageJsonDirectory = flag.String("storage_json_directory", "dm-json-
v1", "The directory on bucket for json files.") |
| 166 storageImagesDirectory = flag.String("storage_images_directory", "dm-ima
ges-v1", "The directory on bucket for png and pdf files.") |
| 167 accessControlEntity = flag.String("access_control_entity", "domain-go
ogle.com", "The entity that has permissions to manage the bucket") |
| 168 graphiteServer = flag.String("graphite_server", "skia-monitoring
:2003", "Where the Graphite metrics ingestion server is running") |
| 169 ) |
| 170 |
| 171 // The pdfXformer struct holds state |
| 172 type pdfXformer struct { |
| 173 client storageClient |
| 174 rasterizers []pdf.Rasterizer |
| 175 results map[string]map[int]string |
| 176 counter int |
| 177 identifier string |
| 178 errorImageMd5 string |
| 179 } |
| 180 |
| 181 // rasterizeOnce applies a single rastetizer to the given pdf file. |
| 182 // If the rasterizer fails, use the errorImage. If everything |
| 183 // succeeds, upload the PNG. |
| 184 func (xformer *pdfXformer) rasterizeOnce(pdfPath string, rasterizerIndex int) (s
tring, error) { |
| 185 rasterizer := xformer.rasterizers[rasterizerIndex] |
| 186 tempdir := filepath.Dir(pdfPath) |
| 187 pngPath := path.Join(tempdir, fmt.Sprintf("%s.%s", rasterizer.String(),
PNG_EXT)) |
| 188 defer removeIfExists(pngPath) |
| 189 glog.Infof("> > > > rasterizing with %s", rasterizer) |
| 190 err := rasterizer.Rasterize(pdfPath, pngPath) |
| 191 if err != nil { |
| 192 glog.Warningf("rasterizing %s with %s failed: %s", filepath.Base
(pdfPath), rasterizer.String(), err) |
| 193 return xformer.errorImageMd5, nil |
| 194 } |
| 195 md5, err := md5OfFile(pngPath) |
| 196 if err != nil { |
| 197 return "", err |
| 198 } |
| 199 f, err := os.Open(pngPath) |
| 200 if err != nil { |
| 201 return "", err |
| 202 } |
| 203 defer util.Close(f) |
| 204 pngUploadPath := fmt.Sprintf("%s/%s.%s", *storageImagesDirectory, md5, P
NG_EXT) |
| 205 didUpload, err := uploadFile(xformer.client, f, *storageBucket, pngUploa
dPath, *accessControlEntity) |
| 206 if err != nil { |
| 207 return "", err |
| 208 } |
| 209 if didUpload { |
| 210 glog.Infof("> > > > uploaded %s", pngUploadPath) |
| 211 } |
| 212 return md5, nil |
| 213 } |
| 214 |
| 215 // makeTmpDir returns a nicely-named directory for temp files in $TMPDIR |
| 216 func (xformer *pdfXformer) makeTmpDir() (string, error) { |
| 217 if xformer.identifier == "" { |
| 218 var host, userName string |
| 219 if h, err := os.Hostname(); err == nil { |
| 220 host = h |
| 221 if i := strings.Index(host, "."); i >= 0 { |
| 222 host = host[:i] |
| 223 } |
| 224 } |
| 225 if currentUser, err := user.Current(); err == nil { |
| 226 userName = currentUser.Username |
| 227 } |
| 228 userName = strings.Replace(userName, `\`, "_", -1) |
| 229 xformer.identifier = fmt.Sprintf("%s.%s.%s.tmp.%d.", filepath.Ba
se(os.Args[0]), host, userName, os.Getpid()) |
| 230 } |
| 231 return ioutil.TempDir(*dataDir, xformer.identifier) |
| 232 } |
| 233 |
| 234 func newResult(key map[string]string, rasterizerName, digest string) *goldingest
er.Result { |
| 235 keyCopy := map[string]string{} |
| 236 for k, v := range key { |
| 237 keyCopy[k] = v |
| 238 } |
| 239 keyCopy["rasterizer"] = rasterizerName |
| 240 options := map[string]string{"ext": PNG_EXT} |
| 241 return &goldingester.Result{Key: keyCopy, Digest: digest, Options: optio
ns} |
| 242 } |
| 243 |
| 244 // processResult rasterizes a single PDF result and returns a set of new results
. |
| 245 func (xformer *pdfXformer) processResult(res goldingester.Result) []*goldingeste
r.Result { |
| 246 rasterizedResults := []*goldingester.Result{} |
| 247 resultMap, found := xformer.results[res.Digest] |
| 248 if found { |
| 249 // Skip rasterizion steps: big win. |
| 250 for index, rasterizer := range xformer.rasterizers { |
| 251 digest, ok := resultMap[index] |
| 252 if ok { |
| 253 rasterizedResults = append(rasterizedResults, |
| 254 newResult(res.Key, rasterizer.String(),
digest)) |
| 255 } else { |
| 256 glog.Errorf("missing rasterizer %s on %s", raste
rizer.String(), res.Digest) |
| 257 } |
| 258 } |
| 259 return rasterizedResults |
| 260 } |
| 261 |
| 262 tempdir, err := xformer.makeTmpDir() |
| 263 if err != nil { |
| 264 glog.Errorf("error making temp directory: %s", err) |
| 265 return rasterizedResults |
| 266 } |
| 267 defer util.RemoveAll(tempdir) |
| 268 pdfPath := path.Join(tempdir, fmt.Sprintf("%s.pdf", res.Digest)) |
| 269 objectName := fmt.Sprintf("%s/%s.pdf", *storageImagesDirectory, res.Dige
st) |
| 270 storageURL := fmt.Sprintf("gs://%s/%s", *storageBucket, objectName) |
| 271 object, err := xformer.client.storageService.Objects.Get(*storageBucket,
objectName).Do() |
| 272 if err != nil { |
| 273 glog.Errorf("unable to find %s: %s", storageURL, err) |
| 274 return []*goldingester.Result{} |
| 275 } |
| 276 pdfData, _, err := gsFetch(object, xformer.client) |
| 277 if err != nil { |
| 278 glog.Errorf("unable to retrieve %s: %s", storageURL, err) |
| 279 return []*goldingester.Result{} |
| 280 } |
| 281 err = writeTo(pdfPath, &pdfData) |
| 282 if err != nil { |
| 283 glog.Errorf("unable to write file %s: %s", pdfPath, err) |
| 284 return []*goldingester.Result{} |
| 285 } |
| 286 if !isPDF(pdfPath) { |
| 287 glog.Errorf("%s is not a PDF", objectName) |
| 288 return []*goldingester.Result{} |
| 289 } |
| 290 resultMap = map[int]string{} |
| 291 for index, rasterizer := range xformer.rasterizers { |
| 292 digest, err := xformer.rasterizeOnce(pdfPath, index) |
| 293 if err != nil { |
| 294 glog.Errorf("rasterizer %s failed on %s.pdf: %s", raster
izer, res.Digest, err) |
| 295 continue |
| 296 } |
| 297 rasterizedResults = append(rasterizedResults, |
| 298 newResult(res.Key, rasterizer.String(), digest)) |
| 299 resultMap[index] = digest |
| 300 } |
| 301 xformer.results[res.Digest] = resultMap |
| 302 return rasterizedResults |
| 303 } |
| 304 |
| 305 // processJsonFile reads a json file and produces a new json file |
| 306 // with rasterized results. |
| 307 func (xformer *pdfXformer) processJsonFile(jsonFileObject *storage.Object) { |
| 308 jsonURL := fmt.Sprintf("gs://%s/%s", *storageBucket, jsonFileObject.Name
) |
| 309 if jsonFileObject.Metadata["rasterized"] == "true" { |
| 310 glog.Infof("> > skipping %s (already processed) {%d}", jsonURL,
xformer.counter) |
| 311 return |
| 312 } |
| 313 body, length, err := gsFetch(jsonFileObject, xformer.client) |
| 314 if err != nil { |
| 315 glog.Errorf("Failed to fetch %s", jsonURL) |
| 316 return |
| 317 } |
| 318 if 0 == length { |
| 319 util.Close(body) |
| 320 glog.Infof("> > skipping %s (empty file) {%d}", jsonURL, xformer
.counter) |
| 321 return |
| 322 } |
| 323 dmstruct := goldingester.DMResults{} |
| 324 err = json.NewDecoder(body).Decode(&dmstruct) |
| 325 util.Close(body) |
| 326 if err != nil { |
| 327 glog.Errorf("Failed to parse %s", jsonURL) |
| 328 return |
| 329 } |
| 330 countPdfResults := 0 |
| 331 for _, res := range dmstruct.Results { |
| 332 if res.Options["ext"] == PDF_EXT { |
| 333 countPdfResults++ |
| 334 } |
| 335 } |
| 336 if 0 == countPdfResults { |
| 337 glog.Infof("> > 0 PDFs found %s {%d}", jsonURL, xformer.counter) |
| 338 xformer.setRasterized(jsonFileObject) |
| 339 return |
| 340 } |
| 341 |
| 342 glog.Infof("> > processing %d pdfs of %d results {%d}", countPdfResults,
len(dmstruct.Results), xformer.counter) |
| 343 rasterizedResults := []*goldingester.Result{} |
| 344 i := 0 |
| 345 for _, res := range dmstruct.Results { |
| 346 if res.Options["ext"] == PDF_EXT { |
| 347 i++ |
| 348 glog.Infof("> > > processing %s.pdf [%d/%d] {%d}", res.D
igest, i, countPdfResults, xformer.counter) |
| 349 rasterizedResults = append(rasterizedResults, xformer.pr
ocessResult(*res)...) |
| 350 } |
| 351 } |
| 352 newDMStruct := goldingester.DMResults{ |
| 353 BuildNumber: dmstruct.BuildNumber, |
| 354 GitHash: dmstruct.GitHash, |
| 355 Key: dmstruct.Key, |
| 356 Results: rasterizedResults, |
| 357 } |
| 358 newJson, err := json.Marshal(newDMStruct) |
| 359 if err != nil { |
| 360 glog.Errorf("Unexpected json.Marshal error: %s", err) |
| 361 return |
| 362 } |
| 363 |
| 364 now := time.Now() |
| 365 // Change the date; leave most of the rest of the path components. |
| 366 jsonPathComponents := strings.Split(jsonFileObject.Name, "/") // []strin
g |
| 367 if len(jsonPathComponents) < 4 { |
| 368 fmt.Errorf("unexpected number of path components %q", jsonPathCo
mponents) |
| 369 return |
| 370 } |
| 371 jsonPathComponents = jsonPathComponents[len(jsonPathComponents)-4:] |
| 372 jsonPathComponents[1] += "-pdfxformer" |
| 373 jsonUploadPath := fmt.Sprintf("%s/%d/%02d/%02d/%02d/%s", |
| 374 *storageJsonDirectory, |
| 375 now.Year(), |
| 376 int(now.Month()), |
| 377 now.Day(), |
| 378 now.Hour(), |
| 379 strings.Join(jsonPathComponents, "/")) |
| 380 |
| 381 _, err = uploadFile(xformer.client, bytes.NewReader(newJson), *storageBu
cket, jsonUploadPath, *accessControlEntity) |
| 382 glog.Infof("> > wrote gs://%s/%s", *storageBucket, jsonUploadPath) |
| 383 newJsonFileObject, err := xformer.client.storageService.Objects.Get(*sto
rageBucket, jsonUploadPath).Do() |
| 384 if err != nil { |
| 385 glog.Errorf("Failed to find %s: %s", jsonUploadPath, err) |
| 386 } else { |
| 387 xformer.setRasterized(newJsonFileObject) |
| 388 } |
| 389 xformer.setRasterized(jsonFileObject) |
| 390 } |
| 391 |
| 392 // setRasterized sets the rasterized metadata flag of the given storage.Object |
| 393 func (xformer *pdfXformer) setRasterized(jsonFileObject *storage.Object) { |
| 394 if nil == jsonFileObject.Metadata { |
| 395 jsonFileObject.Metadata = map[string]string{} |
| 396 } |
| 397 jsonFileObject.Metadata["rasterized"] = "true" |
| 398 _, err := xformer.client.storageService.Objects.Patch(*storageBucket, js
onFileObject.Name, jsonFileObject).Do() |
| 399 if err != nil { |
| 400 glog.Errorf("Failed to update metadata of %s: %s", jsonFileObjec
t.Name, err) |
| 401 } else { |
| 402 glog.Infof("> > Updated metadata of %s", jsonFileObject.Name) |
| 403 } |
| 404 } |
| 405 |
| 406 // processTimeRange calls gs.GetLatestGSDirs to get a list of |
| 407 func (xformer *pdfXformer) processTimeRange(start time.Time, end time.Time) { |
| 408 glog.Infof("Processing time range: (%s, %s)", start.Truncate(time.Second
), end.Truncate(time.Second)) |
| 409 for _, dir := range gs.GetLatestGSDirs(start.Unix(), end.Unix(), *storag
eJsonDirectory) { |
| 410 glog.Infof("> Reading gs://%s/%s\n", *storageBucket, dir) |
| 411 requestedObjects := xformer.client.storageService.Objects.List(*
storageBucket).Prefix(dir).Fields( |
| 412 "nextPageToken", "items/updated", "items/md5Hash", "item
s/mediaLink", "items/name", "items/metadata") |
| 413 for requestedObjects != nil { |
| 414 responseObjects, err := requestedObjects.Do() |
| 415 if err != nil { |
| 416 glog.Errorf("request %#v failed: %s", requestedO
bjects, err) |
| 417 } else { |
| 418 for _, jsonObject := range responseObjects.Items
{ |
| 419 xformer.counter++ |
| 420 glog.Infof("> > Processing object: gs:/
/%s/%s {%d}", *storageBucket, jsonObject.Name, xformer.counter) |
| 421 xformer.processJsonFile(jsonObject) |
| 422 } |
| 423 } |
| 424 if len(responseObjects.NextPageToken) > 0 { |
| 425 requestedObjects.PageToken(responseObjects.NextP
ageToken) |
| 426 } else { |
| 427 requestedObjects = nil |
| 428 } |
| 429 } |
| 430 } |
| 431 glog.Infof("finished time range.") |
| 432 } |
| 433 |
| 434 // uploadErrorImage should be run once to verify that the image is there |
| 435 func (xformer *pdfXformer) uploadErrorImage(path string) error { |
| 436 if "" == path { |
| 437 glog.Fatalf("Missing --path argument") |
| 438 } |
| 439 errorImageMd5, err := md5OfFile(path) |
| 440 if err != nil { |
| 441 glog.Fatalf("Bad --path argument") |
| 442 } |
| 443 errorImageFileReader, err := os.Open(path) |
| 444 if err != nil { |
| 445 return err |
| 446 } |
| 447 defer util.Close(errorImageFileReader) |
| 448 errorImagePath := fmt.Sprintf("%s/%s.png", *storageImagesDirectory, erro
rImageMd5) |
| 449 _, err = uploadFile(xformer.client, errorImageFileReader, *storageBucket
, errorImagePath, *accessControlEntity) |
| 450 if err != nil { |
| 451 return err |
| 452 } |
| 453 xformer.errorImageMd5 = errorImageMd5 |
| 454 return nil |
| 455 } |
| 456 |
| 457 func main() { |
| 458 flag.Parse() |
| 459 common.InitWithMetrics("pdfxform", graphiteServer) |
| 460 |
| 461 client, err := getClient() |
| 462 if err != nil { |
| 463 glog.Fatal(err) |
| 464 } |
| 465 xformer := pdfXformer{ |
| 466 client: client, |
| 467 results: map[string]map[int]string{}, |
| 468 } |
| 469 |
| 470 err = xformer.uploadErrorImage(*failureImage) |
| 471 if err != nil { |
| 472 // If we can't upload this, we can't upload anything. |
| 473 glog.Fatalf("Filed to upload error image: %s", err) |
| 474 } |
| 475 |
| 476 for _, rasterizer := range []pdf.Rasterizer{pdf.Pdfium{}, pdf.Poppler{}}
{ |
| 477 if rasterizer.Enabled() { |
| 478 xformer.rasterizers = append(xformer.rasterizers, raster
izer) |
| 479 } else { |
| 480 glog.Infof("rasterizer %s is disabled", rasterizer.Strin
g()) |
| 481 } |
| 482 } |
| 483 if len(xformer.rasterizers) == 0 { |
| 484 glog.Fatalf("no rasterizers found") |
| 485 } |
| 486 |
| 487 end := time.Now() |
| 488 start := end.Add(-172 * time.Hour) |
| 489 xformer.processTimeRange(start, end) |
| 490 glog.Flush() // Flush before waiting for next tick; it may be a while. |
| 491 for _ = range time.Tick(time.Minute) { |
| 492 start, end = end, time.Now() |
| 493 xformer.processTimeRange(start, end) |
| 494 glog.Flush() |
| 495 } |
| 496 } |
OLD | NEW |