Index: build_scheduler/go/db/modified_tasks.go |
diff --git a/build_scheduler/go/db/modified_tasks.go b/build_scheduler/go/db/modified_tasks.go |
deleted file mode 100644 |
index b76dd9221e30485eb1c04bc3cd896bc485310d21..0000000000000000000000000000000000000000 |
--- a/build_scheduler/go/db/modified_tasks.go |
+++ /dev/null |
@@ -1,128 +0,0 @@ |
-package db |
- |
-import ( |
- "bytes" |
- "encoding/gob" |
- "sort" |
- "sync" |
- "time" |
- |
- "github.com/satori/go.uuid" |
- "github.com/skia-dev/glog" |
-) |
- |
-// ModifiedTasks allows subscribers to keep track of Tasks that have been |
-// modified. It implements StartTrackingModifiedTasks and GetModifiedTasks from |
-// the DB interface. |
-type ModifiedTasks struct { |
- // map[subscriber_id][task_id]task_gob |
- tasks map[string]map[string][]byte |
- // After the expiration time, subscribers are automatically removed. |
- expiration map[string]time.Time |
- // Protects tasks and expiration. |
- mtx sync.RWMutex |
-} |
- |
-// See docs for DB interface. |
-func (m *ModifiedTasks) GetModifiedTasks(id string) ([]*Task, error) { |
- m.mtx.Lock() |
- defer m.mtx.Unlock() |
- if _, ok := m.expiration[id]; !ok { |
- return nil, ErrUnknownId |
- } |
- d := TaskDecoder{} |
- for _, g := range m.tasks[id] { |
- if !d.Process(g) { |
- break |
- } |
- } |
- rv, err := d.Result() |
- if err != nil { |
- return nil, err |
- } |
- m.expiration[id] = time.Now().Add(MODIFIED_TASKS_TIMEOUT) |
- delete(m.tasks, id) |
- sort.Sort(TaskSlice(rv)) |
- return rv, nil |
-} |
- |
-// clearExpiredSubscribers periodically deletes data about any subscribers that |
-// haven't been seen within MODIFIED_TASKS_TIMEOUT. Must be called as a |
-// goroutine. Returns when there are no remaining subscribers. |
-func (m *ModifiedTasks) clearExpiredSubscribers() { |
- ticker := time.NewTicker(time.Minute) |
- for _ = range ticker.C { |
- m.mtx.Lock() |
- for id, t := range m.expiration { |
- if time.Now().After(t) { |
- glog.Warningf("Deleting expired subscriber with id %s; expiration time %s.", id, t) |
- delete(m.tasks, id) |
- delete(m.expiration, id) |
- } |
- } |
- anyLeft := len(m.expiration) > 0 |
- if !anyLeft { |
- m.tasks = nil |
- m.expiration = nil |
- } |
- m.mtx.Unlock() |
- if !anyLeft { |
- break |
- } |
- } |
- ticker.Stop() |
-} |
- |
-// TrackModifiedTask indicates the given Task should be returned from the next |
-// call to GetModifiedTasks from each subscriber. |
-func (m *ModifiedTasks) TrackModifiedTask(t *Task) { |
- var buf bytes.Buffer |
- if err := gob.NewEncoder(&buf).Encode(t); err != nil { |
- glog.Fatal(err) |
- } |
- m.TrackModifiedTasksGOB(map[string][]byte{t.Id: buf.Bytes()}) |
-} |
- |
-// TrackModifiedTasksGOB is a batch, GOB version of TrackModifiedTask. Given a |
-// map from Task.Id to GOB-encoded task, it is equivalent to GOB-decoding each |
-// value of gobs as a Task and calling TrackModifiedTask on each one. Values of |
-// gobs must not be modified after this call. |
-func (m *ModifiedTasks) TrackModifiedTasksGOB(gobs map[string][]byte) { |
- m.mtx.Lock() |
- defer m.mtx.Unlock() |
- for subId, _ := range m.expiration { |
- sub, ok := m.tasks[subId] |
- if !ok { |
- sub = make(map[string][]byte, len(gobs)) |
- m.tasks[subId] = sub |
- } |
- for taskId, gob := range gobs { |
- sub[taskId] = gob |
- } |
- } |
-} |
- |
-// See docs for DB interface. |
-func (m *ModifiedTasks) StartTrackingModifiedTasks() (string, error) { |
- m.mtx.Lock() |
- defer m.mtx.Unlock() |
- if m.expiration == nil { |
- // Initialize the data structure and start expiration goroutine. |
- m.tasks = map[string]map[string][]byte{} |
- m.expiration = map[string]time.Time{} |
- go m.clearExpiredSubscribers() |
- } else if len(m.expiration) >= MAX_MODIFIED_TASKS_USERS { |
- return "", ErrTooManyUsers |
- } |
- id := uuid.NewV5(uuid.NewV1(), uuid.NewV4().String()).String() |
- m.expiration[id] = time.Now().Add(MODIFIED_TASKS_TIMEOUT) |
- return id, nil |
-} |
- |
-// See docs for DB interface. |
-func (m *ModifiedTasks) StopTrackingModifiedTasks(id string) { |
- m.mtx.Lock() |
- defer m.mtx.Unlock() |
- delete(m.tasks, id) |
- delete(m.expiration, id) |
-} |