| OLD | NEW |
| 1 // Copyright 2016 The LUCI Authors. | 1 // Copyright 2016 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 15 matching lines...) Expand all Loading... |
| 26 "github.com/luci/luci-go/common/isolatedclient" | 26 "github.com/luci/luci-go/common/isolatedclient" |
| 27 ) | 27 ) |
| 28 | 28 |
| 29 // isolateService is an internal interface to allow mocking of the | 29 // isolateService is an internal interface to allow mocking of the |
| 30 // isolatedclient.Client. | 30 // isolatedclient.Client. |
| 31 type isolateService interface { | 31 type isolateService interface { |
| 32 Contains(context.Context, []*service.HandlersEndpointsV1Digest) ([]*isol
atedclient.PushState, error) | 32 Contains(context.Context, []*service.HandlersEndpointsV1Digest) ([]*isol
atedclient.PushState, error) |
| 33 Push(context.Context, *isolatedclient.PushState, isolatedclient.Source)
error | 33 Push(context.Context, *isolatedclient.PushState, isolatedclient.Source)
error |
| 34 } | 34 } |
| 35 | 35 |
| 36 // A Checker checks whether items are available on the server. |
| 37 // It has a single implementation, *BundlingChecker. See BundlingChecker for met
hod documentation. |
| 38 type Checker interface { |
| 39 AddItem(item *Item, isolated bool, callback CheckerCallback) |
| 40 Close() error |
| 41 } |
| 42 |
| 36 // CheckerCallback is the callback used by Checker to indicate whether a file is | 43 // CheckerCallback is the callback used by Checker to indicate whether a file is |
| 37 // present on the isolate server. If the item not present, the callback will be | 44 // present on the isolate server. If the item not present, the callback will be |
| 38 // include the PushState necessary to upload it. Otherwise, the PushState will | 45 // include the PushState necessary to upload it. Otherwise, the PushState will |
| 39 // be nil. | 46 // be nil. |
| 40 type CheckerCallback func(*Item, *isolatedclient.PushState) | 47 type CheckerCallback func(*Item, *isolatedclient.PushState) |
| 41 | 48 |
| 42 type checkerItem struct { | 49 type checkerItem struct { |
| 43 item *Item | 50 item *Item |
| 44 isolated bool | 51 isolated bool |
| 45 callback CheckerCallback | 52 callback CheckerCallback |
| 46 } | 53 } |
| 47 | 54 |
| 48 // Checker uses the isolatedclient.Client to check whether items are available | 55 // BundlingChecker uses the isolatedclient.Client to check whether items are ava
ilable |
| 49 // on the server. | 56 // on the server. |
| 50 // Checker methods are safe to call concurrently. | 57 // BundlingChecker methods are safe to call concurrently. |
| 51 type Checker struct { | 58 type BundlingChecker struct { |
| 52 ctx context.Context | 59 ctx context.Context |
| 53 svc isolateService | 60 svc isolateService |
| 54 bundler *bundler.Bundler | 61 bundler *bundler.Bundler |
| 55 err error | 62 err error |
| 56 | 63 |
| 57 Hit, Miss CountBytes | 64 Hit, Miss CountBytes |
| 58 } | 65 } |
| 59 | 66 |
| 60 // CountBytes aggregates a count of files and the number of bytes in them. | 67 // CountBytes aggregates a count of files and the number of bytes in them. |
| 61 type CountBytes struct { | 68 type CountBytes struct { |
| 62 Count int | 69 Count int |
| 63 Bytes int64 | 70 Bytes int64 |
| 64 } | 71 } |
| 65 | 72 |
| 66 func (cb *CountBytes) addFile(size int64) { | 73 func (cb *CountBytes) addFile(size int64) { |
| 67 cb.Count++ | 74 cb.Count++ |
| 68 cb.Bytes += size | 75 cb.Bytes += size |
| 69 } | 76 } |
| 70 | 77 |
| 71 // NewChecker creates a NewChecker with the given isolated client. | 78 // NewChecker creates a new Checker with the given isolated client. |
| 72 // The provided context is used to make all requests to the isolate server. | 79 // The provided context is used to make all requests to the isolate server. |
| 73 func NewChecker(ctx context.Context, client *isolatedclient.Client) *Checker { | 80 func NewChecker(ctx context.Context, client *isolatedclient.Client) *BundlingChe
cker { |
| 74 return newChecker(ctx, client) | 81 return newChecker(ctx, client) |
| 75 } | 82 } |
| 76 | 83 |
| 77 func newChecker(ctx context.Context, svc isolateService) *Checker { | 84 func newChecker(ctx context.Context, svc isolateService) *BundlingChecker { |
| 78 » c := &Checker{ | 85 » c := &BundlingChecker{ |
| 79 svc: svc, | 86 svc: svc, |
| 80 ctx: ctx, | 87 ctx: ctx, |
| 81 } | 88 } |
| 82 c.bundler = bundler.NewBundler(checkerItem{}, func(bundle interface{}) { | 89 c.bundler = bundler.NewBundler(checkerItem{}, func(bundle interface{}) { |
| 83 items := bundle.([]checkerItem) | 90 items := bundle.([]checkerItem) |
| 84 if c.err != nil { | 91 if c.err != nil { |
| 85 for _, item := range items { | 92 for _, item := range items { |
| 86 // Drop any more incoming items. | 93 // Drop any more incoming items. |
| 87 log.Printf("WARNING dropped %q from Checker", it
em.item.Path) | 94 log.Printf("WARNING dropped %q from Checker", it
em.item.Path) |
| 88 } | 95 } |
| 89 return | 96 return |
| 90 } | 97 } |
| 91 c.err = c.check(items) | 98 c.err = c.check(items) |
| 92 }) | 99 }) |
| 93 c.bundler.DelayThreshold = 50 * time.Millisecond | 100 c.bundler.DelayThreshold = 50 * time.Millisecond |
| 94 c.bundler.BundleCountThreshold = 50 | 101 c.bundler.BundleCountThreshold = 50 |
| 95 return c | 102 return c |
| 96 } | 103 } |
| 97 | 104 |
| 98 // AddItem adds the given item to the checker for testing, and invokes the provi
ded | 105 // AddItem adds the given item to the checker for testing, and invokes the provi
ded |
| 99 // callback asynchronously. The isolated param indicates whether the given item | 106 // callback asynchronously. The isolated param indicates whether the given item |
| 100 // represents a JSON isolated manifest (as opposed to a regular file). | 107 // represents a JSON isolated manifest (as opposed to a regular file). |
| 101 // In the case of an error, the callback may never be invoked. | 108 // In the case of an error, the callback may never be invoked. |
| 102 func (c *Checker) AddItem(item *Item, isolated bool, callback CheckerCallback) { | 109 func (c *BundlingChecker) AddItem(item *Item, isolated bool, callback CheckerCal
lback) { |
| 103 if err := c.bundler.Add(checkerItem{item, isolated, callback}, 0); err !
= nil { | 110 if err := c.bundler.Add(checkerItem{item, isolated, callback}, 0); err !
= nil { |
| 104 // An error is only returned if the size is too big, but we alwa
ys use | 111 // An error is only returned if the size is too big, but we alwa
ys use |
| 105 // zero size so no error is possible. | 112 // zero size so no error is possible. |
| 106 panic(err) | 113 panic(err) |
| 107 } | 114 } |
| 108 } | 115 } |
| 109 | 116 |
| 110 // Close shuts down the checker, blocking until all pending items have been | 117 // Close shuts down the checker, blocking until all pending items have been |
| 111 // checked with the server. Close returns the first error encountered during | 118 // checked with the server. Close returns the first error encountered during |
| 112 // the checking process, if any. | 119 // the checking process, if any. |
| 113 // After Close has returned, Checker is guaranteed to no longer invoke any | 120 // After Close has returned, Checker is guaranteed to no longer invoke any |
| 114 // previously-provided callback. | 121 // previously-provided callback. |
| 115 func (c *Checker) Close() error { | 122 func (c *BundlingChecker) Close() error { |
| 116 c.bundler.Flush() | 123 c.bundler.Flush() |
| 117 // After Close has returned, we know there are no outstanding running | 124 // After Close has returned, we know there are no outstanding running |
| 118 // checks. | 125 // checks. |
| 119 return c.err | 126 return c.err |
| 120 } | 127 } |
| 121 | 128 |
| 122 // check is invoked from the bundler's handler. As such, it is only ever run | 129 // check is invoked from the bundler's handler. As such, it is only ever run |
| 123 // one invocation at a time. | 130 // one invocation at a time. |
| 124 func (c *Checker) check(items []checkerItem) error { | 131 func (c *BundlingChecker) check(items []checkerItem) error { |
| 125 var digests []*service.HandlersEndpointsV1Digest | 132 var digests []*service.HandlersEndpointsV1Digest |
| 126 for _, item := range items { | 133 for _, item := range items { |
| 127 digests = append(digests, &service.HandlersEndpointsV1Digest{ | 134 digests = append(digests, &service.HandlersEndpointsV1Digest{ |
| 128 Digest: string(item.item.Digest), | 135 Digest: string(item.item.Digest), |
| 129 Size: item.item.Size, | 136 Size: item.item.Size, |
| 130 IsIsolated: item.isolated, | 137 IsIsolated: item.isolated, |
| 131 }) | 138 }) |
| 132 } | 139 } |
| 133 out, err := c.svc.Contains(c.ctx, digests) | 140 out, err := c.svc.Contains(c.ctx, digests) |
| 134 if err != nil { | 141 if err != nil { |
| 135 // TODO(djd): propogate this more cleanly. At the moment, droppi
ng | 142 // TODO(djd): propogate this more cleanly. At the moment, droppi
ng |
| 136 // callbacks may cause the TAR archiver to hang. | 143 // callbacks may cause the TAR archiver to hang. |
| 137 log.Printf("ERROR: isolate Contains call failed: %v", err) | 144 log.Printf("ERROR: isolate Contains call failed: %v", err) |
| 138 os.Exit(infraFailExit) | 145 os.Exit(infraFailExit) |
| 139 return err | 146 return err |
| 140 } | 147 } |
| 141 for i, item := range items { | 148 for i, item := range items { |
| 142 if size := item.item.Size; out[i] == nil { | 149 if size := item.item.Size; out[i] == nil { |
| 143 c.Hit.addFile(size) | 150 c.Hit.addFile(size) |
| 144 } else { | 151 } else { |
| 145 c.Miss.addFile(size) | 152 c.Miss.addFile(size) |
| 146 } | 153 } |
| 147 | 154 |
| 148 item.callback(item.item, out[i]) | 155 item.callback(item.item, out[i]) |
| 149 } | 156 } |
| 150 return nil | 157 return nil |
| 151 } | 158 } |
| OLD | NEW |