Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(248)

Side by Side Diff: build_scheduler/go/db/modified_tasks.go

Issue 2296763008: [task scheduler] Move files from build_scheduler/ to task_scheduler/ (Closed) Base URL: https://skia.googlesource.com/buildbot@master
Patch Set: Created 4 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « build_scheduler/go/db/memory_test.go ('k') | build_scheduler/go/db/modified_tasks_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 package db
2
3 import (
4 "bytes"
5 "encoding/gob"
6 "sort"
7 "sync"
8 "time"
9
10 "github.com/satori/go.uuid"
11 "github.com/skia-dev/glog"
12 )
13
14 // ModifiedTasks allows subscribers to keep track of Tasks that have been
15 // modified. It implements StartTrackingModifiedTasks and GetModifiedTasks from
16 // the DB interface.
17 type ModifiedTasks struct {
18 // map[subscriber_id][task_id]task_gob
19 tasks map[string]map[string][]byte
20 // After the expiration time, subscribers are automatically removed.
21 expiration map[string]time.Time
22 // Protects tasks and expiration.
23 mtx sync.RWMutex
24 }
25
26 // See docs for DB interface.
27 func (m *ModifiedTasks) GetModifiedTasks(id string) ([]*Task, error) {
28 m.mtx.Lock()
29 defer m.mtx.Unlock()
30 if _, ok := m.expiration[id]; !ok {
31 return nil, ErrUnknownId
32 }
33 d := TaskDecoder{}
34 for _, g := range m.tasks[id] {
35 if !d.Process(g) {
36 break
37 }
38 }
39 rv, err := d.Result()
40 if err != nil {
41 return nil, err
42 }
43 m.expiration[id] = time.Now().Add(MODIFIED_TASKS_TIMEOUT)
44 delete(m.tasks, id)
45 sort.Sort(TaskSlice(rv))
46 return rv, nil
47 }
48
49 // clearExpiredSubscribers periodically deletes data about any subscribers that
50 // haven't been seen within MODIFIED_TASKS_TIMEOUT. Must be called as a
51 // goroutine. Returns when there are no remaining subscribers.
52 func (m *ModifiedTasks) clearExpiredSubscribers() {
53 ticker := time.NewTicker(time.Minute)
54 for _ = range ticker.C {
55 m.mtx.Lock()
56 for id, t := range m.expiration {
57 if time.Now().After(t) {
58 glog.Warningf("Deleting expired subscriber with id %s; expiration time %s.", id, t)
59 delete(m.tasks, id)
60 delete(m.expiration, id)
61 }
62 }
63 anyLeft := len(m.expiration) > 0
64 if !anyLeft {
65 m.tasks = nil
66 m.expiration = nil
67 }
68 m.mtx.Unlock()
69 if !anyLeft {
70 break
71 }
72 }
73 ticker.Stop()
74 }
75
76 // TrackModifiedTask indicates the given Task should be returned from the next
77 // call to GetModifiedTasks from each subscriber.
78 func (m *ModifiedTasks) TrackModifiedTask(t *Task) {
79 var buf bytes.Buffer
80 if err := gob.NewEncoder(&buf).Encode(t); err != nil {
81 glog.Fatal(err)
82 }
83 m.TrackModifiedTasksGOB(map[string][]byte{t.Id: buf.Bytes()})
84 }
85
86 // TrackModifiedTasksGOB is a batch, GOB version of TrackModifiedTask. Given a
87 // map from Task.Id to GOB-encoded task, it is equivalent to GOB-decoding each
88 // value of gobs as a Task and calling TrackModifiedTask on each one. Values of
89 // gobs must not be modified after this call.
90 func (m *ModifiedTasks) TrackModifiedTasksGOB(gobs map[string][]byte) {
91 m.mtx.Lock()
92 defer m.mtx.Unlock()
93 for subId, _ := range m.expiration {
94 sub, ok := m.tasks[subId]
95 if !ok {
96 sub = make(map[string][]byte, len(gobs))
97 m.tasks[subId] = sub
98 }
99 for taskId, gob := range gobs {
100 sub[taskId] = gob
101 }
102 }
103 }
104
105 // See docs for DB interface.
106 func (m *ModifiedTasks) StartTrackingModifiedTasks() (string, error) {
107 m.mtx.Lock()
108 defer m.mtx.Unlock()
109 if m.expiration == nil {
110 // Initialize the data structure and start expiration goroutine.
111 m.tasks = map[string]map[string][]byte{}
112 m.expiration = map[string]time.Time{}
113 go m.clearExpiredSubscribers()
114 } else if len(m.expiration) >= MAX_MODIFIED_TASKS_USERS {
115 return "", ErrTooManyUsers
116 }
117 id := uuid.NewV5(uuid.NewV1(), uuid.NewV4().String()).String()
118 m.expiration[id] = time.Now().Add(MODIFIED_TASKS_TIMEOUT)
119 return id, nil
120 }
121
122 // See docs for DB interface.
123 func (m *ModifiedTasks) StopTrackingModifiedTasks(id string) {
124 m.mtx.Lock()
125 defer m.mtx.Unlock()
126 delete(m.tasks, id)
127 delete(m.expiration, id)
128 }
OLDNEW
« no previous file with comments | « build_scheduler/go/db/memory_test.go ('k') | build_scheduler/go/db/modified_tasks_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698