| OLD | NEW |
| (Empty) |
| 1 package db | |
| 2 | |
| 3 import ( | |
| 4 "bytes" | |
| 5 "encoding/gob" | |
| 6 "sort" | |
| 7 "sync" | |
| 8 "time" | |
| 9 | |
| 10 "github.com/satori/go.uuid" | |
| 11 "github.com/skia-dev/glog" | |
| 12 ) | |
| 13 | |
| 14 // ModifiedTasks allows subscribers to keep track of Tasks that have been | |
| 15 // modified. It implements StartTrackingModifiedTasks and GetModifiedTasks from | |
| 16 // the DB interface. | |
| 17 type ModifiedTasks struct { | |
| 18 // map[subscriber_id][task_id]task_gob | |
| 19 tasks map[string]map[string][]byte | |
| 20 // After the expiration time, subscribers are automatically removed. | |
| 21 expiration map[string]time.Time | |
| 22 // Protects tasks and expiration. | |
| 23 mtx sync.RWMutex | |
| 24 } | |
| 25 | |
| 26 // See docs for DB interface. | |
| 27 func (m *ModifiedTasks) GetModifiedTasks(id string) ([]*Task, error) { | |
| 28 m.mtx.Lock() | |
| 29 defer m.mtx.Unlock() | |
| 30 if _, ok := m.expiration[id]; !ok { | |
| 31 return nil, ErrUnknownId | |
| 32 } | |
| 33 d := TaskDecoder{} | |
| 34 for _, g := range m.tasks[id] { | |
| 35 if !d.Process(g) { | |
| 36 break | |
| 37 } | |
| 38 } | |
| 39 rv, err := d.Result() | |
| 40 if err != nil { | |
| 41 return nil, err | |
| 42 } | |
| 43 m.expiration[id] = time.Now().Add(MODIFIED_TASKS_TIMEOUT) | |
| 44 delete(m.tasks, id) | |
| 45 sort.Sort(TaskSlice(rv)) | |
| 46 return rv, nil | |
| 47 } | |
| 48 | |
| 49 // clearExpiredSubscribers periodically deletes data about any subscribers that | |
| 50 // haven't been seen within MODIFIED_TASKS_TIMEOUT. Must be called as a | |
| 51 // goroutine. Returns when there are no remaining subscribers. | |
| 52 func (m *ModifiedTasks) clearExpiredSubscribers() { | |
| 53 ticker := time.NewTicker(time.Minute) | |
| 54 for _ = range ticker.C { | |
| 55 m.mtx.Lock() | |
| 56 for id, t := range m.expiration { | |
| 57 if time.Now().After(t) { | |
| 58 glog.Warningf("Deleting expired subscriber with
id %s; expiration time %s.", id, t) | |
| 59 delete(m.tasks, id) | |
| 60 delete(m.expiration, id) | |
| 61 } | |
| 62 } | |
| 63 anyLeft := len(m.expiration) > 0 | |
| 64 if !anyLeft { | |
| 65 m.tasks = nil | |
| 66 m.expiration = nil | |
| 67 } | |
| 68 m.mtx.Unlock() | |
| 69 if !anyLeft { | |
| 70 break | |
| 71 } | |
| 72 } | |
| 73 ticker.Stop() | |
| 74 } | |
| 75 | |
| 76 // TrackModifiedTask indicates the given Task should be returned from the next | |
| 77 // call to GetModifiedTasks from each subscriber. | |
| 78 func (m *ModifiedTasks) TrackModifiedTask(t *Task) { | |
| 79 var buf bytes.Buffer | |
| 80 if err := gob.NewEncoder(&buf).Encode(t); err != nil { | |
| 81 glog.Fatal(err) | |
| 82 } | |
| 83 m.TrackModifiedTasksGOB(map[string][]byte{t.Id: buf.Bytes()}) | |
| 84 } | |
| 85 | |
| 86 // TrackModifiedTasksGOB is a batch, GOB version of TrackModifiedTask. Given a | |
| 87 // map from Task.Id to GOB-encoded task, it is equivalent to GOB-decoding each | |
| 88 // value of gobs as a Task and calling TrackModifiedTask on each one. Values of | |
| 89 // gobs must not be modified after this call. | |
| 90 func (m *ModifiedTasks) TrackModifiedTasksGOB(gobs map[string][]byte) { | |
| 91 m.mtx.Lock() | |
| 92 defer m.mtx.Unlock() | |
| 93 for subId, _ := range m.expiration { | |
| 94 sub, ok := m.tasks[subId] | |
| 95 if !ok { | |
| 96 sub = make(map[string][]byte, len(gobs)) | |
| 97 m.tasks[subId] = sub | |
| 98 } | |
| 99 for taskId, gob := range gobs { | |
| 100 sub[taskId] = gob | |
| 101 } | |
| 102 } | |
| 103 } | |
| 104 | |
| 105 // See docs for DB interface. | |
| 106 func (m *ModifiedTasks) StartTrackingModifiedTasks() (string, error) { | |
| 107 m.mtx.Lock() | |
| 108 defer m.mtx.Unlock() | |
| 109 if m.expiration == nil { | |
| 110 // Initialize the data structure and start expiration goroutine. | |
| 111 m.tasks = map[string]map[string][]byte{} | |
| 112 m.expiration = map[string]time.Time{} | |
| 113 go m.clearExpiredSubscribers() | |
| 114 } else if len(m.expiration) >= MAX_MODIFIED_TASKS_USERS { | |
| 115 return "", ErrTooManyUsers | |
| 116 } | |
| 117 id := uuid.NewV5(uuid.NewV1(), uuid.NewV4().String()).String() | |
| 118 m.expiration[id] = time.Now().Add(MODIFIED_TASKS_TIMEOUT) | |
| 119 return id, nil | |
| 120 } | |
| 121 | |
| 122 // See docs for DB interface. | |
| 123 func (m *ModifiedTasks) StopTrackingModifiedTasks(id string) { | |
| 124 m.mtx.Lock() | |
| 125 defer m.mtx.Unlock() | |
| 126 delete(m.tasks, id) | |
| 127 delete(m.expiration, id) | |
| 128 } | |
| OLD | NEW |