Index: src/d8.cc |
diff --git a/src/d8.cc b/src/d8.cc |
index 7a1f779f70b5fecb48c5e0dd88a63aa40da49996..6c3505a69eae4805a51baccb827abec8cc0bed3e 100644 |
--- a/src/d8.cc |
+++ b/src/d8.cc |
@@ -205,6 +205,8 @@ base::Mutex Shell::context_mutex_; |
const base::TimeTicks Shell::kInitialTicks = |
base::TimeTicks::HighResolutionNow(); |
Persistent<Context> Shell::utility_context_; |
+base::Mutex Shell::workers_mutex_; |
+bool Shell::allow_new_workers_ = true; |
i::List<Worker*> Shell::workers_; |
i::List<SharedArrayBuffer::Contents> Shell::externalized_shared_contents_; |
#endif // !V8_SHARED |
@@ -693,12 +695,17 @@ void Shell::WorkerNew(const v8::FunctionCallbackInfo<v8::Value>& args) { |
return; |
} |
- Worker* worker = new Worker; |
- args.This()->SetInternalField(0, External::New(isolate, worker)); |
- workers_.Add(worker); |
+ { |
+ base::LockGuard<base::Mutex> lock_guard(&workers_mutex_); |
+ if (!allow_new_workers_) return; |
+ |
+ Worker* worker = new Worker; |
+ args.This()->SetInternalField(0, External::New(isolate, worker)); |
+ workers_.Add(worker); |
- String::Utf8Value function_string(args[0]->ToString()); |
- worker->StartExecuteInThread(isolate, *function_string); |
+ String::Utf8Value function_string(args[0]->ToString()); |
+ worker->StartExecuteInThread(isolate, *function_string); |
+ } |
} |
@@ -793,8 +800,6 @@ void Shell::WorkerTerminate(const v8::FunctionCallbackInfo<v8::Value>& args) { |
Worker* worker = |
static_cast<Worker*>(Local<External>::Cast(this_value)->Value()); |
worker->Terminate(); |
- workers_.RemoveElement(worker); |
- delete worker; |
} |
#endif // !V8_SHARED |
@@ -1620,7 +1625,11 @@ void SerializationDataQueue::Clear() { |
Worker::Worker() |
- : in_semaphore_(0), out_semaphore_(0), thread_(NULL), script_(NULL) {} |
+ : in_semaphore_(0), |
+ out_semaphore_(0), |
+ thread_(NULL), |
+ script_(NULL), |
+ state_(IDLE) {} |
Worker::~Worker() { Cleanup(); } |
@@ -1628,11 +1637,7 @@ Worker::~Worker() { Cleanup(); } |
void Worker::StartExecuteInThread(Isolate* isolate, |
const char* function_string) { |
- if (thread_) { |
- Throw(isolate, "Only one worker allowed"); |
- return; |
- } |
- |
+ DCHECK(base::NoBarrier_Load(&state_) == IDLE); |
static const char format[] = "(%s).call(this);"; |
size_t len = strlen(function_string) + sizeof(format); |
@@ -1640,6 +1645,7 @@ void Worker::StartExecuteInThread(Isolate* isolate, |
i::Vector<char> vec(script_, static_cast<int>(len + 1)); |
i::SNPrintF(vec, format, function_string); |
+ base::NoBarrier_Store(&state_, RUNNING); |
thread_ = new WorkerThread(this); |
thread_->Start(); |
} |
@@ -1652,8 +1658,9 @@ void Worker::PostMessage(SerializationData* data) { |
SerializationData* Worker::GetMessage() { |
- SerializationData* data; |
+ SerializationData* data = NULL; |
while (!out_queue_.Dequeue(&data)) { |
+ if (base::NoBarrier_Load(&state_) != RUNNING) break; |
out_semaphore_.Wait(); |
} |
@@ -1662,10 +1669,11 @@ SerializationData* Worker::GetMessage() { |
void Worker::Terminate() { |
- if (thread_ == NULL) return; |
- PostMessage(NULL); |
- thread_->Join(); |
- Cleanup(); |
+ if (base::NoBarrier_CompareAndSwap(&state_, RUNNING, TERMINATED) == RUNNING) { |
+ // Post NULL to wake the Worker thread message loop. |
+ PostMessage(NULL); |
+ thread_->Join(); |
+ } |
} |
@@ -1698,31 +1706,31 @@ void Worker::ExecuteInThread() { |
// First run the script |
Handle<String> file_name = String::NewFromUtf8(isolate, "unnamed"); |
Handle<String> source = String::NewFromUtf8(isolate, script_); |
- Shell::ExecuteString(isolate, source, file_name, false, true); |
- |
- // Get the message handler |
- Handle<Value> onmessage = |
- global->Get(String::NewFromUtf8(isolate, "onmessage")); |
- if (onmessage->IsFunction()) { |
- Handle<Function> onmessage_fun = Handle<Function>::Cast(onmessage); |
- // Now wait for messages |
- bool done = false; |
- while (!done) { |
- in_semaphore_.Wait(); |
- SerializationData* data; |
- if (!in_queue_.Dequeue(&data)) continue; |
- if (data == NULL) { |
- done = true; |
- break; |
+ if (Shell::ExecuteString(isolate, source, file_name, true, true)) { |
+ // Get the message handler |
+ Handle<Value> onmessage = |
+ global->Get(String::NewFromUtf8(isolate, "onmessage")); |
+ if (onmessage->IsFunction()) { |
+ Handle<Function> onmessage_fun = Handle<Function>::Cast(onmessage); |
+ // Now wait for messages |
+ bool done = false; |
+ while (!done) { |
+ in_semaphore_.Wait(); |
+ SerializationData* data; |
+ if (!in_queue_.Dequeue(&data)) continue; |
+ if (data == NULL) { |
+ done = true; |
+ break; |
+ } |
+ int offset = 0; |
+ Local<Value> data_value; |
+ if (Shell::DeserializeValue(isolate, *data, &offset) |
+ .ToLocal(&data_value)) { |
+ Handle<Value> argv[] = {data_value}; |
+ (void)onmessage_fun->Call(context, global, 1, argv); |
+ } |
+ delete data; |
} |
- int offset = 0; |
- Local<Value> data_value; |
- if (Shell::DeserializeValue(isolate, *data, &offset) |
- .ToLocal(&data_value)) { |
- Handle<Value> argv[] = {data_value}; |
- (void)onmessage_fun->Call(context, global, 1, argv); |
- } |
- delete data; |
} |
} |
} |
@@ -1730,6 +1738,12 @@ void Worker::ExecuteInThread() { |
Shell::CollectGarbage(isolate); |
} |
isolate->Dispose(); |
+ |
+ if (base::NoBarrier_CompareAndSwap(&state_, RUNNING, TERMINATED) == RUNNING) { |
+ // Post NULL to wake the thread waiting on GetMessage() if there is one. |
+ out_queue_.Enqueue(NULL); |
+ out_semaphore_.Signal(); |
+ } |
} |
@@ -2192,12 +2206,28 @@ MaybeLocal<Value> Shell::DeserializeValue(Isolate* isolate, |
void Shell::CleanupWorkers() { |
- for (int i = 0; i < workers_.length(); ++i) { |
- Worker* worker = workers_[i]; |
+ // Make a copy of workers_, because we don't want to call Worker::Terminate |
+ // while holding the workers_mutex_ lock. Otherwise, if a worker is about to |
+ // create a new Worker, it would deadlock. |
+ i::List<Worker*> workers_copy; |
+ { |
+ base::LockGuard<base::Mutex> lock_guard(&workers_mutex_); |
+ allow_new_workers_ = false; |
+ workers_copy.AddAll(workers_); |
+ workers_.Clear(); |
+ } |
+ |
+ for (int i = 0; i < workers_copy.length(); ++i) { |
+ Worker* worker = workers_copy[i]; |
worker->Terminate(); |
delete worker; |
} |
- workers_.Clear(); |
+ |
+ // Now that all workers are terminated, we can re-enable Worker creation. |
+ { |
+ base::LockGuard<base::Mutex> lock_guard(&workers_mutex_); |
+ allow_new_workers_ = true; |
+ } |
for (int i = 0; i < externalized_shared_contents_.length(); ++i) { |
const SharedArrayBuffer::Contents& contents = |