OLD | NEW |
| (Empty) |
1 package db | |
2 | |
3 import ( | |
4 "bytes" | |
5 "encoding/gob" | |
6 "sort" | |
7 "sync" | |
8 "time" | |
9 | |
10 "github.com/satori/go.uuid" | |
11 "github.com/skia-dev/glog" | |
12 ) | |
13 | |
14 // ModifiedTasks allows subscribers to keep track of Tasks that have been | |
15 // modified. It implements StartTrackingModifiedTasks and GetModifiedTasks from | |
16 // the DB interface. | |
17 type ModifiedTasks struct { | |
18 // map[subscriber_id][task_id]task_gob | |
19 tasks map[string]map[string][]byte | |
20 // After the expiration time, subscribers are automatically removed. | |
21 expiration map[string]time.Time | |
22 // Protects tasks and expiration. | |
23 mtx sync.RWMutex | |
24 } | |
25 | |
26 // See docs for DB interface. | |
27 func (m *ModifiedTasks) GetModifiedTasks(id string) ([]*Task, error) { | |
28 m.mtx.Lock() | |
29 defer m.mtx.Unlock() | |
30 if _, ok := m.expiration[id]; !ok { | |
31 return nil, ErrUnknownId | |
32 } | |
33 d := TaskDecoder{} | |
34 for _, g := range m.tasks[id] { | |
35 if !d.Process(g) { | |
36 break | |
37 } | |
38 } | |
39 rv, err := d.Result() | |
40 if err != nil { | |
41 return nil, err | |
42 } | |
43 m.expiration[id] = time.Now().Add(MODIFIED_TASKS_TIMEOUT) | |
44 delete(m.tasks, id) | |
45 sort.Sort(TaskSlice(rv)) | |
46 return rv, nil | |
47 } | |
48 | |
49 // clearExpiredSubscribers periodically deletes data about any subscribers that | |
50 // haven't been seen within MODIFIED_TASKS_TIMEOUT. Must be called as a | |
51 // goroutine. Returns when there are no remaining subscribers. | |
52 func (m *ModifiedTasks) clearExpiredSubscribers() { | |
53 ticker := time.NewTicker(time.Minute) | |
54 for _ = range ticker.C { | |
55 m.mtx.Lock() | |
56 for id, t := range m.expiration { | |
57 if time.Now().After(t) { | |
58 glog.Warningf("Deleting expired subscriber with
id %s; expiration time %s.", id, t) | |
59 delete(m.tasks, id) | |
60 delete(m.expiration, id) | |
61 } | |
62 } | |
63 anyLeft := len(m.expiration) > 0 | |
64 if !anyLeft { | |
65 m.tasks = nil | |
66 m.expiration = nil | |
67 } | |
68 m.mtx.Unlock() | |
69 if !anyLeft { | |
70 break | |
71 } | |
72 } | |
73 ticker.Stop() | |
74 } | |
75 | |
76 // TrackModifiedTask indicates the given Task should be returned from the next | |
77 // call to GetModifiedTasks from each subscriber. | |
78 func (m *ModifiedTasks) TrackModifiedTask(t *Task) { | |
79 var buf bytes.Buffer | |
80 if err := gob.NewEncoder(&buf).Encode(t); err != nil { | |
81 glog.Fatal(err) | |
82 } | |
83 m.TrackModifiedTasksGOB(map[string][]byte{t.Id: buf.Bytes()}) | |
84 } | |
85 | |
86 // TrackModifiedTasksGOB is a batch, GOB version of TrackModifiedTask. Given a | |
87 // map from Task.Id to GOB-encoded task, it is equivalent to GOB-decoding each | |
88 // value of gobs as a Task and calling TrackModifiedTask on each one. Values of | |
89 // gobs must not be modified after this call. | |
90 func (m *ModifiedTasks) TrackModifiedTasksGOB(gobs map[string][]byte) { | |
91 m.mtx.Lock() | |
92 defer m.mtx.Unlock() | |
93 for subId, _ := range m.expiration { | |
94 sub, ok := m.tasks[subId] | |
95 if !ok { | |
96 sub = make(map[string][]byte, len(gobs)) | |
97 m.tasks[subId] = sub | |
98 } | |
99 for taskId, gob := range gobs { | |
100 sub[taskId] = gob | |
101 } | |
102 } | |
103 } | |
104 | |
105 // See docs for DB interface. | |
106 func (m *ModifiedTasks) StartTrackingModifiedTasks() (string, error) { | |
107 m.mtx.Lock() | |
108 defer m.mtx.Unlock() | |
109 if m.expiration == nil { | |
110 // Initialize the data structure and start expiration goroutine. | |
111 m.tasks = map[string]map[string][]byte{} | |
112 m.expiration = map[string]time.Time{} | |
113 go m.clearExpiredSubscribers() | |
114 } else if len(m.expiration) >= MAX_MODIFIED_TASKS_USERS { | |
115 return "", ErrTooManyUsers | |
116 } | |
117 id := uuid.NewV5(uuid.NewV1(), uuid.NewV4().String()).String() | |
118 m.expiration[id] = time.Now().Add(MODIFIED_TASKS_TIMEOUT) | |
119 return id, nil | |
120 } | |
121 | |
122 // See docs for DB interface. | |
123 func (m *ModifiedTasks) StopTrackingModifiedTasks(id string) { | |
124 m.mtx.Lock() | |
125 defer m.mtx.Unlock() | |
126 delete(m.tasks, id) | |
127 delete(m.expiration, id) | |
128 } | |
OLD | NEW |