| Index: golden/go/pdfxform/main.go
|
| diff --git a/golden/go/pdfxform/main.go b/golden/go/pdfxform/main.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6b737e83d9efd335d893e8015ae82f08772378d7
|
| --- /dev/null
|
| +++ b/golden/go/pdfxform/main.go
|
| @@ -0,0 +1,496 @@
|
| +// pdfxform is a server that rasterizes PDF documents into PNG
|
| +package main
|
| +
|
| +import (
|
| + "bytes"
|
| + "crypto/md5"
|
| + "encoding/hex"
|
| + "encoding/json"
|
| + "flag"
|
| + "fmt"
|
| + "io"
|
| + "io/ioutil"
|
| + "net/http"
|
| + "os"
|
| + "os/user"
|
| + "path"
|
| + "path/filepath"
|
| + "strings"
|
| + "time"
|
| +
|
| + "github.com/skia-dev/glog"
|
| + "go.skia.org/infra/go/auth"
|
| + "go.skia.org/infra/go/common"
|
| + "go.skia.org/infra/go/gs"
|
| + "go.skia.org/infra/go/pdf"
|
| + "go.skia.org/infra/go/util"
|
| + "go.skia.org/infra/perf/go/goldingester"
|
| + "google.golang.org/api/storage/v1"
|
| +)
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +const (
|
| + PNG_EXT = "png"
|
| + PDF_EXT = "pdf"
|
| +)
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +// md5OfFile calculates the MD5 checksum of a file.
|
| +func md5OfFile(path string) (string, error) {
|
| + md5 := md5.New()
|
| + f, err := os.Open(path)
|
| + if err != nil {
|
| + return "", err
|
| + }
|
| + defer util.Close(f)
|
| + if _, err = io.Copy(md5, f); err != nil {
|
| + return "", err
|
| + }
|
| + return hex.EncodeToString(md5.Sum(nil)), nil
|
| +}
|
| +
|
| +// removeIfExists is like util.Remove, but logs no error if the file does not exist.
|
| +func removeIfExists(path string) {
|
| + if err := os.Remove(path); err != nil {
|
| + if !os.IsNotExist(err) {
|
| + glog.Errorf("Failed to Remove(%s): %v", path, err)
|
| + }
|
| + }
|
| +}
|
| +
|
| +// isPDF returns true if the path appears to point to a PDF file.
|
| +func isPDF(path string) bool {
|
| + f, err := os.Open(path)
|
| + if err != nil {
|
| + return false
|
| + }
|
| + defer util.Close(f)
|
| + buffer := make([]byte, 4)
|
| + if n, err := f.Read(buffer); n != 4 || err != nil {
|
| + return false
|
| + }
|
| + return string(buffer) == "%PDF"
|
| +}
|
| +
|
| +// writeTo opens a file and dumps the contents of the reader into it.
|
| +func writeTo(path string, reader *io.ReadCloser) error {
|
| + defer util.Close(*reader)
|
| + file, err := os.Create(path)
|
| + if err == nil {
|
| + _, err = io.Copy(file, *reader)
|
| + }
|
| + return err
|
| +}
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +// storageClient struct is used for uploading to cloud storage
|
| +type storageClient struct {
|
| + httpClient *http.Client
|
| + storageService *storage.Service
|
| +}
|
| +
|
| +// getClient returns an authorized storage.Service and the
|
| +// corresponding http.Client; if anything goes wrong, it logs a fatal
|
| +// error.
|
| +func getClient() (storageClient, error) {
|
| + var client *http.Client
|
| + var err error
|
| + if *local {
|
| + client, err = auth.RunFlow(auth.OAuthConfig(*oauthCacheFile, auth.SCOPE_FULL_CONTROL))
|
| + // TODO(stephana): Replace auth.RunFlow with auth.NewClient
|
| + // client, err = auth.NewClient(true, *oauthCacheFile, auth.SCOPE_FULL_CONTROL, auth.SCOPE_GCE)
|
| + } else {
|
| + client = auth.GCEServiceAccountClient(&http.Transport{Dial: util.DialTimeout})
|
| + }
|
| + if err != nil {
|
| + return storageClient{}, err
|
| + }
|
| + gsService, err := storage.New(client)
|
| + if err != nil {
|
| + return storageClient{}, err
|
| + }
|
| + return storageClient{httpClient: client, storageService: gsService}, nil
|
| +}
|
| +
|
| +// gsFetch fetch the object's data from google storage
|
| +func gsFetch(object *storage.Object, sc storageClient) (io.ReadCloser, int64, error) {
|
| + request, err := gs.RequestForStorageURL(object.MediaLink)
|
| + if err != nil {
|
| + return nil, -1, err
|
| + }
|
| + resp, err := sc.httpClient.Do(request)
|
| + if err != nil {
|
| + return nil, -1, err
|
| + }
|
| + if resp.StatusCode != 200 {
|
| + _ = resp.Body.Close()
|
| + return nil, -1, fmt.Errorf("Failed to retrieve: %s %d %s", object.MediaLink, resp.StatusCode, resp.Status)
|
| + }
|
| + return resp.Body, resp.ContentLength, nil
|
| +}
|
| +
|
| +// uploadFile uploads the specified file to the remote dir in Google
|
| +// Storage. It also sets the appropriate ACLs on the uploaded file.
|
| +// If the file already exists on the server, do nothing.
|
| +func uploadFile(sc storageClient, input io.Reader, storageBucket, storagePath, accessControlEntity string) (bool, error) {
|
| + obj, _ := sc.storageService.Objects.Get(storageBucket, storagePath).Do()
|
| + if obj != nil {
|
| + return false, nil // noclobber
|
| + }
|
| + fullPath := fmt.Sprintf("gs://%s/%s", storageBucket, storagePath)
|
| + object := &storage.Object{Name: storagePath}
|
| + if _, err := sc.storageService.Objects.Insert(storageBucket, object).Media(input).Do(); err != nil {
|
| + return false, fmt.Errorf("Objects.Insert(%s) failed: %s", fullPath, err)
|
| + }
|
| + objectAcl := &storage.ObjectAccessControl{
|
| + Bucket: storageBucket, Entity: accessControlEntity, Object: storagePath, Role: "READER",
|
| + }
|
| + if _, err := sc.storageService.ObjectAccessControls.Insert(storageBucket, storagePath, objectAcl).Do(); err != nil {
|
| + return false, fmt.Errorf("Could not update ACL of %s: %s", fullPath, err)
|
| + }
|
| + return true, nil
|
| +}
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +var (
|
| + local = flag.Bool("local", false, "Set to true if not running in prod")
|
| + oauthCacheFile = flag.String("oauth_cache_file", "oauth_cache.dat", "Path to look for and store an OAuth token")
|
| + dataDir = flag.String("data_dir", "", "Directory to store data in.")
|
| + failureImage = flag.String("failure_image", "", "Location of a PNG image; must be set")
|
| + storageBucket = flag.String("storage_bucket", "chromium-skia-gm", "The bucket for json, pdf, and png files")
|
| + storageJsonDirectory = flag.String("storage_json_directory", "dm-json-v1", "The directory on bucket for json files.")
|
| + storageImagesDirectory = flag.String("storage_images_directory", "dm-images-v1", "The directory on bucket for png and pdf files.")
|
| + accessControlEntity = flag.String("access_control_entity", "domain-google.com", "The entity that has permissions to manage the bucket")
|
| + graphiteServer = flag.String("graphite_server", "skia-monitoring:2003", "Where the Graphite metrics ingestion server is running")
|
| +)
|
| +
|
| +// The pdfXformer struct holds state
|
| +type pdfXformer struct {
|
| + client storageClient
|
| + rasterizers []pdf.Rasterizer
|
| + results map[string]map[int]string
|
| + counter int
|
| + identifier string
|
| + errorImageMd5 string
|
| +}
|
| +
|
| +// rasterizeOnce applies a single rastetizer to the given pdf file.
|
| +// If the rasterizer fails, use the errorImage. If everything
|
| +// succeeds, upload the PNG.
|
| +func (xformer *pdfXformer) rasterizeOnce(pdfPath string, rasterizerIndex int) (string, error) {
|
| + rasterizer := xformer.rasterizers[rasterizerIndex]
|
| + tempdir := filepath.Dir(pdfPath)
|
| + pngPath := path.Join(tempdir, fmt.Sprintf("%s.%s", rasterizer.String(), PNG_EXT))
|
| + defer removeIfExists(pngPath)
|
| + glog.Infof("> > > > rasterizing with %s", rasterizer)
|
| + err := rasterizer.Rasterize(pdfPath, pngPath)
|
| + if err != nil {
|
| + glog.Warningf("rasterizing %s with %s failed: %s", filepath.Base(pdfPath), rasterizer.String(), err)
|
| + return xformer.errorImageMd5, nil
|
| + }
|
| + md5, err := md5OfFile(pngPath)
|
| + if err != nil {
|
| + return "", err
|
| + }
|
| + f, err := os.Open(pngPath)
|
| + if err != nil {
|
| + return "", err
|
| + }
|
| + defer util.Close(f)
|
| + pngUploadPath := fmt.Sprintf("%s/%s.%s", *storageImagesDirectory, md5, PNG_EXT)
|
| + didUpload, err := uploadFile(xformer.client, f, *storageBucket, pngUploadPath, *accessControlEntity)
|
| + if err != nil {
|
| + return "", err
|
| + }
|
| + if didUpload {
|
| + glog.Infof("> > > > uploaded %s", pngUploadPath)
|
| + }
|
| + return md5, nil
|
| +}
|
| +
|
| +// makeTmpDir returns a nicely-named directory for temp files in $TMPDIR
|
| +func (xformer *pdfXformer) makeTmpDir() (string, error) {
|
| + if xformer.identifier == "" {
|
| + var host, userName string
|
| + if h, err := os.Hostname(); err == nil {
|
| + host = h
|
| + if i := strings.Index(host, "."); i >= 0 {
|
| + host = host[:i]
|
| + }
|
| + }
|
| + if currentUser, err := user.Current(); err == nil {
|
| + userName = currentUser.Username
|
| + }
|
| + userName = strings.Replace(userName, `\`, "_", -1)
|
| + xformer.identifier = fmt.Sprintf("%s.%s.%s.tmp.%d.", filepath.Base(os.Args[0]), host, userName, os.Getpid())
|
| + }
|
| + return ioutil.TempDir(*dataDir, xformer.identifier)
|
| +}
|
| +
|
| +func newResult(key map[string]string, rasterizerName, digest string) *goldingester.Result {
|
| + keyCopy := map[string]string{}
|
| + for k, v := range key {
|
| + keyCopy[k] = v
|
| + }
|
| + keyCopy["rasterizer"] = rasterizerName
|
| + options := map[string]string{"ext": PNG_EXT}
|
| + return &goldingester.Result{Key: keyCopy, Digest: digest, Options: options}
|
| +}
|
| +
|
| +// processResult rasterizes a single PDF result and returns a set of new results.
|
| +func (xformer *pdfXformer) processResult(res goldingester.Result) []*goldingester.Result {
|
| + rasterizedResults := []*goldingester.Result{}
|
| + resultMap, found := xformer.results[res.Digest]
|
| + if found {
|
| + // Skip rasterizion steps: big win.
|
| + for index, rasterizer := range xformer.rasterizers {
|
| + digest, ok := resultMap[index]
|
| + if ok {
|
| + rasterizedResults = append(rasterizedResults,
|
| + newResult(res.Key, rasterizer.String(), digest))
|
| + } else {
|
| + glog.Errorf("missing rasterizer %s on %s", rasterizer.String(), res.Digest)
|
| + }
|
| + }
|
| + return rasterizedResults
|
| + }
|
| +
|
| + tempdir, err := xformer.makeTmpDir()
|
| + if err != nil {
|
| + glog.Errorf("error making temp directory: %s", err)
|
| + return rasterizedResults
|
| + }
|
| + defer util.RemoveAll(tempdir)
|
| + pdfPath := path.Join(tempdir, fmt.Sprintf("%s.pdf", res.Digest))
|
| + objectName := fmt.Sprintf("%s/%s.pdf", *storageImagesDirectory, res.Digest)
|
| + storageURL := fmt.Sprintf("gs://%s/%s", *storageBucket, objectName)
|
| + object, err := xformer.client.storageService.Objects.Get(*storageBucket, objectName).Do()
|
| + if err != nil {
|
| + glog.Errorf("unable to find %s: %s", storageURL, err)
|
| + return []*goldingester.Result{}
|
| + }
|
| + pdfData, _, err := gsFetch(object, xformer.client)
|
| + if err != nil {
|
| + glog.Errorf("unable to retrieve %s: %s", storageURL, err)
|
| + return []*goldingester.Result{}
|
| + }
|
| + err = writeTo(pdfPath, &pdfData)
|
| + if err != nil {
|
| + glog.Errorf("unable to write file %s: %s", pdfPath, err)
|
| + return []*goldingester.Result{}
|
| + }
|
| + if !isPDF(pdfPath) {
|
| + glog.Errorf("%s is not a PDF", objectName)
|
| + return []*goldingester.Result{}
|
| + }
|
| + resultMap = map[int]string{}
|
| + for index, rasterizer := range xformer.rasterizers {
|
| + digest, err := xformer.rasterizeOnce(pdfPath, index)
|
| + if err != nil {
|
| + glog.Errorf("rasterizer %s failed on %s.pdf: %s", rasterizer, res.Digest, err)
|
| + continue
|
| + }
|
| + rasterizedResults = append(rasterizedResults,
|
| + newResult(res.Key, rasterizer.String(), digest))
|
| + resultMap[index] = digest
|
| + }
|
| + xformer.results[res.Digest] = resultMap
|
| + return rasterizedResults
|
| +}
|
| +
|
| +// processJsonFile reads a json file and produces a new json file
|
| +// with rasterized results.
|
| +func (xformer *pdfXformer) processJsonFile(jsonFileObject *storage.Object) {
|
| + jsonURL := fmt.Sprintf("gs://%s/%s", *storageBucket, jsonFileObject.Name)
|
| + if jsonFileObject.Metadata["rasterized"] == "true" {
|
| + glog.Infof("> > skipping %s (already processed) {%d}", jsonURL, xformer.counter)
|
| + return
|
| + }
|
| + body, length, err := gsFetch(jsonFileObject, xformer.client)
|
| + if err != nil {
|
| + glog.Errorf("Failed to fetch %s", jsonURL)
|
| + return
|
| + }
|
| + if 0 == length {
|
| + util.Close(body)
|
| + glog.Infof("> > skipping %s (empty file) {%d}", jsonURL, xformer.counter)
|
| + return
|
| + }
|
| + dmstruct := goldingester.DMResults{}
|
| + err = json.NewDecoder(body).Decode(&dmstruct)
|
| + util.Close(body)
|
| + if err != nil {
|
| + glog.Errorf("Failed to parse %s", jsonURL)
|
| + return
|
| + }
|
| + countPdfResults := 0
|
| + for _, res := range dmstruct.Results {
|
| + if res.Options["ext"] == PDF_EXT {
|
| + countPdfResults++
|
| + }
|
| + }
|
| + if 0 == countPdfResults {
|
| + glog.Infof("> > 0 PDFs found %s {%d}", jsonURL, xformer.counter)
|
| + xformer.setRasterized(jsonFileObject)
|
| + return
|
| + }
|
| +
|
| + glog.Infof("> > processing %d pdfs of %d results {%d}", countPdfResults, len(dmstruct.Results), xformer.counter)
|
| + rasterizedResults := []*goldingester.Result{}
|
| + i := 0
|
| + for _, res := range dmstruct.Results {
|
| + if res.Options["ext"] == PDF_EXT {
|
| + i++
|
| + glog.Infof("> > > processing %s.pdf [%d/%d] {%d}", res.Digest, i, countPdfResults, xformer.counter)
|
| + rasterizedResults = append(rasterizedResults, xformer.processResult(*res)...)
|
| + }
|
| + }
|
| + newDMStruct := goldingester.DMResults{
|
| + BuildNumber: dmstruct.BuildNumber,
|
| + GitHash: dmstruct.GitHash,
|
| + Key: dmstruct.Key,
|
| + Results: rasterizedResults,
|
| + }
|
| + newJson, err := json.Marshal(newDMStruct)
|
| + if err != nil {
|
| + glog.Errorf("Unexpected json.Marshal error: %s", err)
|
| + return
|
| + }
|
| +
|
| + now := time.Now()
|
| + // Change the date; leave most of the rest of the path components.
|
| + jsonPathComponents := strings.Split(jsonFileObject.Name, "/") // []string
|
| + if len(jsonPathComponents) < 4 {
|
| + fmt.Errorf("unexpected number of path components %q", jsonPathComponents)
|
| + return
|
| + }
|
| + jsonPathComponents = jsonPathComponents[len(jsonPathComponents)-4:]
|
| + jsonPathComponents[1] += "-pdfxformer"
|
| + jsonUploadPath := fmt.Sprintf("%s/%d/%02d/%02d/%02d/%s",
|
| + *storageJsonDirectory,
|
| + now.Year(),
|
| + int(now.Month()),
|
| + now.Day(),
|
| + now.Hour(),
|
| + strings.Join(jsonPathComponents, "/"))
|
| +
|
| + _, err = uploadFile(xformer.client, bytes.NewReader(newJson), *storageBucket, jsonUploadPath, *accessControlEntity)
|
| + glog.Infof("> > wrote gs://%s/%s", *storageBucket, jsonUploadPath)
|
| + newJsonFileObject, err := xformer.client.storageService.Objects.Get(*storageBucket, jsonUploadPath).Do()
|
| + if err != nil {
|
| + glog.Errorf("Failed to find %s: %s", jsonUploadPath, err)
|
| + } else {
|
| + xformer.setRasterized(newJsonFileObject)
|
| + }
|
| + xformer.setRasterized(jsonFileObject)
|
| +}
|
| +
|
| +// setRasterized sets the rasterized metadata flag of the given storage.Object
|
| +func (xformer *pdfXformer) setRasterized(jsonFileObject *storage.Object) {
|
| + if nil == jsonFileObject.Metadata {
|
| + jsonFileObject.Metadata = map[string]string{}
|
| + }
|
| + jsonFileObject.Metadata["rasterized"] = "true"
|
| + _, err := xformer.client.storageService.Objects.Patch(*storageBucket, jsonFileObject.Name, jsonFileObject).Do()
|
| + if err != nil {
|
| + glog.Errorf("Failed to update metadata of %s: %s", jsonFileObject.Name, err)
|
| + } else {
|
| + glog.Infof("> > Updated metadata of %s", jsonFileObject.Name)
|
| + }
|
| +}
|
| +
|
| +// processTimeRange calls gs.GetLatestGSDirs to get a list of
|
| +func (xformer *pdfXformer) processTimeRange(start time.Time, end time.Time) {
|
| + glog.Infof("Processing time range: (%s, %s)", start.Truncate(time.Second), end.Truncate(time.Second))
|
| + for _, dir := range gs.GetLatestGSDirs(start.Unix(), end.Unix(), *storageJsonDirectory) {
|
| + glog.Infof("> Reading gs://%s/%s\n", *storageBucket, dir)
|
| + requestedObjects := xformer.client.storageService.Objects.List(*storageBucket).Prefix(dir).Fields(
|
| + "nextPageToken", "items/updated", "items/md5Hash", "items/mediaLink", "items/name", "items/metadata")
|
| + for requestedObjects != nil {
|
| + responseObjects, err := requestedObjects.Do()
|
| + if err != nil {
|
| + glog.Errorf("request %#v failed: %s", requestedObjects, err)
|
| + } else {
|
| + for _, jsonObject := range responseObjects.Items {
|
| + xformer.counter++
|
| + glog.Infof("> > Processing object: gs://%s/%s {%d}", *storageBucket, jsonObject.Name, xformer.counter)
|
| + xformer.processJsonFile(jsonObject)
|
| + }
|
| + }
|
| + if len(responseObjects.NextPageToken) > 0 {
|
| + requestedObjects.PageToken(responseObjects.NextPageToken)
|
| + } else {
|
| + requestedObjects = nil
|
| + }
|
| + }
|
| + }
|
| + glog.Infof("finished time range.")
|
| +}
|
| +
|
| +// uploadErrorImage should be run once to verify that the image is there
|
| +func (xformer *pdfXformer) uploadErrorImage(path string) error {
|
| + if "" == path {
|
| + glog.Fatalf("Missing --path argument")
|
| + }
|
| + errorImageMd5, err := md5OfFile(path)
|
| + if err != nil {
|
| + glog.Fatalf("Bad --path argument")
|
| + }
|
| + errorImageFileReader, err := os.Open(path)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + defer util.Close(errorImageFileReader)
|
| + errorImagePath := fmt.Sprintf("%s/%s.png", *storageImagesDirectory, errorImageMd5)
|
| + _, err = uploadFile(xformer.client, errorImageFileReader, *storageBucket, errorImagePath, *accessControlEntity)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + xformer.errorImageMd5 = errorImageMd5
|
| + return nil
|
| +}
|
| +
|
| +func main() {
|
| + flag.Parse()
|
| + common.InitWithMetrics("pdfxform", graphiteServer)
|
| +
|
| + client, err := getClient()
|
| + if err != nil {
|
| + glog.Fatal(err)
|
| + }
|
| + xformer := pdfXformer{
|
| + client: client,
|
| + results: map[string]map[int]string{},
|
| + }
|
| +
|
| + err = xformer.uploadErrorImage(*failureImage)
|
| + if err != nil {
|
| + // If we can't upload this, we can't upload anything.
|
| + glog.Fatalf("Filed to upload error image: %s", err)
|
| + }
|
| +
|
| + for _, rasterizer := range []pdf.Rasterizer{pdf.Pdfium{}, pdf.Poppler{}} {
|
| + if rasterizer.Enabled() {
|
| + xformer.rasterizers = append(xformer.rasterizers, rasterizer)
|
| + } else {
|
| + glog.Infof("rasterizer %s is disabled", rasterizer.String())
|
| + }
|
| + }
|
| + if len(xformer.rasterizers) == 0 {
|
| + glog.Fatalf("no rasterizers found")
|
| + }
|
| +
|
| + end := time.Now()
|
| + start := end.Add(-172 * time.Hour)
|
| + xformer.processTimeRange(start, end)
|
| + glog.Flush() // Flush before waiting for next tick; it may be a while.
|
| + for _ = range time.Tick(time.Minute) {
|
| + start, end = end, time.Now()
|
| + xformer.processTimeRange(start, end)
|
| + glog.Flush()
|
| + }
|
| +}
|
|
|