| Index: runtime/bin/eventhandler_fuchsia.cc
 | 
| diff --git a/runtime/bin/eventhandler_fuchsia.cc b/runtime/bin/eventhandler_fuchsia.cc
 | 
| index 3d586ca034995a3f1ce01d1943768d7253d5d24b..71b366df645961f4ee51f9def2b4cd54ab87ba48 100644
 | 
| --- a/runtime/bin/eventhandler_fuchsia.cc
 | 
| +++ b/runtime/bin/eventhandler_fuchsia.cc
 | 
| @@ -10,14 +10,20 @@
 | 
|  #include "bin/eventhandler.h"
 | 
|  #include "bin/eventhandler_fuchsia.h"
 | 
|  
 | 
| -#include <errno.h>      // NOLINT
 | 
| -#include <fcntl.h>      // NOLINT
 | 
| -#include <pthread.h>    // NOLINT
 | 
| -#include <stdio.h>      // NOLINT
 | 
| -#include <string.h>     // NOLINT
 | 
| -#include <sys/epoll.h>  // NOLINT
 | 
| -#include <sys/stat.h>   // NOLINT
 | 
| -#include <unistd.h>     // NOLINT
 | 
| +#include <errno.h>
 | 
| +#include <fcntl.h>
 | 
| +#include <magenta/status.h>
 | 
| +#include <magenta/syscalls.h>
 | 
| +#include <magenta/syscalls/object.h>
 | 
| +#include <magenta/syscalls/port.h>
 | 
| +#include <mxio/private.h>
 | 
| +#include <pthread.h>
 | 
| +#include <stdio.h>
 | 
| +#include <string.h>
 | 
| +#include <sys/epoll.h>
 | 
| +#include <sys/socket.h>
 | 
| +#include <sys/stat.h>
 | 
| +#include <unistd.h>
 | 
|  
 | 
|  #include "bin/fdutils.h"
 | 
|  #include "bin/lockers.h"
 | 
| @@ -28,169 +34,293 @@
 | 
|  #include "platform/hashmap.h"
 | 
|  #include "platform/utils.h"
 | 
|  
 | 
| -// #define EVENTHANDLER_LOGGING 1
 | 
| -#if defined(EVENTHANDLER_LOGGING)
 | 
| -#define LOG_ERR(msg, ...) Log::PrintErr(msg, ##__VA_ARGS__)
 | 
| -#define LOG_INFO(msg, ...) Log::Print(msg, ##__VA_ARGS__)
 | 
| +// The EventHandler for Fuchsia uses its "ports v2" API:
 | 
| +// https://fuchsia.googlesource.com/magenta/+/HEAD/docs/syscalls/port_create.md
 | 
| +// This API does not have epoll()-like edge triggering (EPOLLET). Since clients
 | 
| +// of the EventHandler expect edge-triggered notifications, we must simulate it.
 | 
| +// When a packet from mx_port_wait() indicates that a signal is asserted for a
 | 
| +// handle, we unsubscribe from that signal until the event that asserted the
 | 
| +// signal can be processed. For example:
 | 
| +//
 | 
| +// 1. We get MX_SOCKET_WRITABLE from mx_port_wait() for a handle.
 | 
| +// 2. We send kOutEvent to the Dart thread.
 | 
| +// 3. We unsubscribe from further MX_SOCKET_WRITABLE signals for the handle.
 | 
| +// 4. Some time later the Dart thread actually does a write().
 | 
| +// 5. After writing, the Dart thread resubscribes to write events.
 | 
| +//
 | 
| +// We use he same procedure for MX_SOCKET_READABLE, and read()/accept().
 | 
| +
 | 
| +// define EVENTHANDLER_LOG_ERROR to get log messages only for errors.
 | 
| +// define EVENTHANDLER_LOG_INFO to get log messages for both information and
 | 
| +//   errors.
 | 
| +// #define EVENTHANDLER_LOG_INFO 1
 | 
| +#define EVENTHANDLER_LOG_ERROR 1
 | 
| +#if defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
 | 
| +#define LOG_ERR(msg, ...)                                                      \
 | 
| +  {                                                                            \
 | 
| +    int err = errno;                                                           \
 | 
| +    Log::PrintErr("Dart EventHandler ERROR: %s:%d: " msg, __FILE__, __LINE__,  \
 | 
| +                  ##__VA_ARGS__);                                              \
 | 
| +    errno = err;                                                               \
 | 
| +  }
 | 
| +#if defined(EVENTHANDLER_LOG_INFO)
 | 
| +#define LOG_INFO(msg, ...)                                                     \
 | 
| +  Log::Print("Dart EventHandler INFO: %s:%d: " msg, __FILE__, __LINE__,        \
 | 
| +             ##__VA_ARGS__)
 | 
| +#else
 | 
| +#define LOG_INFO(msg, ...)
 | 
| +#endif  // defined(EVENTHANDLER_LOG_INFO)
 | 
|  #else
 | 
|  #define LOG_ERR(msg, ...)
 | 
|  #define LOG_INFO(msg, ...)
 | 
| -#endif  // defined(EVENTHANDLER_LOGGING)
 | 
| +#endif  // defined(EVENTHANDLER_LOG_INFO) || defined(EVENTHANDLER_LOG_ERROR)
 | 
|  
 | 
|  namespace dart {
 | 
|  namespace bin {
 | 
|  
 | 
| -#if defined(EVENTHANDLER_LOGGING)
 | 
| -static void PrintEventMask(intptr_t fd, intptr_t events) {
 | 
| -  Log::PrintErr("%d ", fd);
 | 
| -  if ((events & EPOLLIN) != 0) {
 | 
| -    Log::PrintErr("EPOLLIN ");
 | 
| -  }
 | 
| -  if ((events & EPOLLPRI) != 0) {
 | 
| -    Log::PrintErr("EPOLLPRI ");
 | 
| -  }
 | 
| -  if ((events & EPOLLOUT) != 0) {
 | 
| -    Log::PrintErr("EPOLLOUT ");
 | 
| -  }
 | 
| -  if ((events & EPOLLERR) != 0) {
 | 
| -    Log::PrintErr("EPOLLERR ");
 | 
| -  }
 | 
| -  if ((events & EPOLLHUP) != 0) {
 | 
| -    Log::PrintErr("EPOLLHUP ");
 | 
| +intptr_t IOHandle::Read(void* buffer, intptr_t num_bytes) {
 | 
| +  MutexLocker ml(mutex_);
 | 
| +  const ssize_t read_bytes = NO_RETRY_EXPECTED(read(fd_, buffer, num_bytes));
 | 
| +  const int err = errno;
 | 
| +  LOG_INFO("IOHandle::Read: fd = %ld. read %ld bytes\n", fd_, read_bytes);
 | 
| +
 | 
| +  // Resubscribe to read events. We resubscribe to events even if read() returns
 | 
| +  // an error. The error might be, e.g. EWOULDBLOCK, in which case
 | 
| +  // re-subscription is necessary. Logic in the caller decides which errors are
 | 
| +  // real, and which are ignore-and-continue.
 | 
| +  read_events_enabled_ = true;
 | 
| +  if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLIN, wait_key_)) {
 | 
| +    LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
 | 
|    }
 | 
| -  if ((events & EPOLLRDHUP) != 0) {
 | 
| -    Log::PrintErr("EPOLLRDHUP ");
 | 
| +
 | 
| +  errno = err;
 | 
| +  return read_bytes;
 | 
| +}
 | 
| +
 | 
| +
 | 
| +intptr_t IOHandle::Write(const void* buffer, intptr_t num_bytes) {
 | 
| +  MutexLocker ml(mutex_);
 | 
| +  const ssize_t written_bytes =
 | 
| +      NO_RETRY_EXPECTED(write(fd_, buffer, num_bytes));
 | 
| +  const int err = errno;
 | 
| +  LOG_INFO("IOHandle::Write: fd = %ld. wrote %ld bytes\n", fd_, written_bytes);
 | 
| +
 | 
| +  // Resubscribe to write events.
 | 
| +  write_events_enabled_ = true;
 | 
| +  if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLOUT, wait_key_)) {
 | 
| +    LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
 | 
|    }
 | 
| -  int all_events =
 | 
| -      EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
 | 
| -  if ((events & ~all_events) != 0) {
 | 
| -    Log::PrintErr("(and %08x) ", events & ~all_events);
 | 
| +
 | 
| +  errno = err;
 | 
| +  return written_bytes;
 | 
| +}
 | 
| +
 | 
| +
 | 
| +intptr_t IOHandle::Accept(struct sockaddr* addr, socklen_t* addrlen) {
 | 
| +  MutexLocker ml(mutex_);
 | 
| +  const intptr_t socket = NO_RETRY_EXPECTED(accept(fd_, addr, addrlen));
 | 
| +  const int err = errno;
 | 
| +  LOG_INFO("IOHandle::Accept: fd = %ld. socket = %ld\n", fd_, socket);
 | 
| +
 | 
| +  // Re-subscribe to read events.
 | 
| +  read_events_enabled_ = true;
 | 
| +  if (!AsyncWaitLocked(MX_HANDLE_INVALID, EPOLLIN, wait_key_)) {
 | 
| +    LOG_ERR("IOHandle::AsyncWait failed for fd = %ld\n", fd_);
 | 
|    }
 | 
|  
 | 
| -  Log::PrintErr("\n");
 | 
| +  errno = err;
 | 
| +  return socket;
 | 
| +}
 | 
| +
 | 
| +
 | 
| +void IOHandle::Close() {
 | 
| +  MutexLocker ml(mutex_);
 | 
| +  VOID_NO_RETRY_EXPECTED(close(fd_));
 | 
|  }
 | 
| -#endif
 | 
|  
 | 
|  
 | 
| -intptr_t DescriptorInfo::GetPollEvents() {
 | 
| +uint32_t IOHandle::MaskToEpollEvents(intptr_t mask) {
 | 
| +  MutexLocker ml(mutex_);
 | 
|    // Do not ask for EPOLLERR and EPOLLHUP explicitly as they are
 | 
|    // triggered anyway.
 | 
| -  intptr_t events = 0;
 | 
| -  if ((Mask() & (1 << kInEvent)) != 0) {
 | 
| +  uint32_t events = EPOLLRDHUP;
 | 
| +  if (read_events_enabled_ && ((mask & (1 << kInEvent)) != 0)) {
 | 
|      events |= EPOLLIN;
 | 
|    }
 | 
| -  if ((Mask() & (1 << kOutEvent)) != 0) {
 | 
| +  if (write_events_enabled_ && ((mask & (1 << kOutEvent)) != 0)) {
 | 
|      events |= EPOLLOUT;
 | 
|    }
 | 
|    return events;
 | 
|  }
 | 
|  
 | 
|  
 | 
| -// Unregister the file descriptor for a DescriptorInfo structure with
 | 
| -// epoll.
 | 
| -static void RemoveFromEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
 | 
| -  LOG_INFO("RemoveFromEpollInstance: fd = %ld\n", di->fd());
 | 
| -  VOID_NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, di->fd(), NULL));
 | 
| +intptr_t IOHandle::EpollEventsToMask(intptr_t events) {
 | 
| +  if ((events & EPOLLERR) != 0) {
 | 
| +    // Return error only if EPOLLIN is present.
 | 
| +    return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
 | 
| +  }
 | 
| +  intptr_t event_mask = 0;
 | 
| +  if ((events & EPOLLIN) != 0) {
 | 
| +    event_mask |= (1 << kInEvent);
 | 
| +  }
 | 
| +  if ((events & EPOLLOUT) != 0) {
 | 
| +    event_mask |= (1 << kOutEvent);
 | 
| +  }
 | 
| +  if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
 | 
| +    event_mask |= (1 << kCloseEvent);
 | 
| +  }
 | 
| +  return event_mask;
 | 
|  }
 | 
|  
 | 
|  
 | 
| -static void AddToEpollInstance(intptr_t epoll_fd_, DescriptorInfo* di) {
 | 
| -  struct epoll_event event;
 | 
| -  event.events = EPOLLRDHUP | di->GetPollEvents();
 | 
| -  if (!di->IsListeningSocket()) {
 | 
| -    event.events |= EPOLLET;
 | 
| -  }
 | 
| -  event.data.ptr = di;
 | 
| -  LOG_INFO("AddToEpollInstance: fd = %ld\n", di->fd());
 | 
| -  int status =
 | 
| -      NO_RETRY_EXPECTED(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, di->fd(), &event));
 | 
| -  LOG_INFO("AddToEpollInstance: fd = %ld, status = %d\n", di->fd(), status);
 | 
| -#if defined(EVENTHANDLER_LOGGING)
 | 
| -  PrintEventMask(di->fd(), event.events);
 | 
| -#endif
 | 
| -  if (status == -1) {
 | 
| -    // TODO(dart:io): Verify that the dart end is handling this correctly.
 | 
| -
 | 
| -    // Epoll does not accept the file descriptor. It could be due to
 | 
| -    // already closed file descriptor, or unuspported devices, such
 | 
| -    // as /dev/null. In such case, mark the file descriptor as closed,
 | 
| -    // so dart will handle it accordingly.
 | 
| -    di->NotifyAllDartPorts(1 << kCloseEvent);
 | 
| +bool IOHandle::AsyncWaitLocked(mx_handle_t port,
 | 
| +                               uint32_t events,
 | 
| +                               uint64_t key) {
 | 
| +  LOG_INFO("IOHandle::AsyncWait: fd = %ld\n", fd_);
 | 
| +  // The call to __mxio_fd_to_io() in the DescriptorInfo constructor may have
 | 
| +  // returned NULL. If it did, propagate the problem up to Dart.
 | 
| +  if (mxio_ == NULL) {
 | 
| +    LOG_ERR("__mxio_fd_to_io(%d) returned NULL\n", fd_);
 | 
| +    return false;
 | 
|    }
 | 
| +
 | 
| +  mx_handle_t handle;
 | 
| +  mx_signals_t signals;
 | 
| +  __mxio_wait_begin(mxio_, events, &handle, &signals);
 | 
| +  if (handle == MX_HANDLE_INVALID) {
 | 
| +    LOG_ERR("fd = %ld __mxio_wait_begin returned an invalid handle\n", fd_);
 | 
| +    return false;
 | 
| +  }
 | 
| +
 | 
| +  // Remember the port. Use the remembered port if the argument "port" is
 | 
| +  // MX_HANDLE_INVALID.
 | 
| +  ASSERT((port != MX_HANDLE_INVALID) || (port_ != MX_HANDLE_INVALID));
 | 
| +  if ((port_ == MX_HANDLE_INVALID) || (port != MX_HANDLE_INVALID)) {
 | 
| +    port_ = port;
 | 
| +  }
 | 
| +
 | 
| +  handle_ = handle;
 | 
| +  wait_key_ = key;
 | 
| +  LOG_INFO("mx_object_wait_async(fd = %ld, signals = %x)\n", fd_, signals);
 | 
| +  mx_status_t status =
 | 
| +      mx_object_wait_async(handle_, port_, key, signals, MX_WAIT_ASYNC_ONCE);
 | 
| +  if (status != MX_OK) {
 | 
| +    LOG_ERR("mx_object_wait_async failed: %s\n", mx_status_get_string(status));
 | 
| +    return false;
 | 
| +  }
 | 
| +
 | 
| +  return true;
 | 
|  }
 | 
|  
 | 
|  
 | 
| -EventHandlerImplementation::EventHandlerImplementation()
 | 
| -    : socket_map_(&HashMap::SamePointerValue, 16) {
 | 
| -  intptr_t result;
 | 
| -  result = NO_RETRY_EXPECTED(pipe(interrupt_fds_));
 | 
| -  if (result != 0) {
 | 
| -    FATAL("Pipe creation failed");
 | 
| +bool IOHandle::AsyncWait(mx_handle_t port, uint32_t events, uint64_t key) {
 | 
| +  MutexLocker ml(mutex_);
 | 
| +  return AsyncWaitLocked(port, events, key);
 | 
| +}
 | 
| +
 | 
| +
 | 
| +void IOHandle::CancelWait(mx_handle_t port, uint64_t key) {
 | 
| +  MutexLocker ml(mutex_);
 | 
| +  LOG_INFO("IOHandle::CancelWait: fd = %ld\n", fd_);
 | 
| +  ASSERT(port != MX_HANDLE_INVALID);
 | 
| +  ASSERT(handle_ != MX_HANDLE_INVALID);
 | 
| +  mx_status_t status = mx_port_cancel(port, handle_, key);
 | 
| +  if ((status != MX_OK) && (status != MX_ERR_NOT_FOUND)) {
 | 
| +    LOG_ERR("mx_port_cancel failed: %s\n", mx_status_get_string(status));
 | 
| +  }
 | 
| +}
 | 
| +
 | 
| +
 | 
| +uint32_t IOHandle::WaitEnd(mx_signals_t observed) {
 | 
| +  MutexLocker ml(mutex_);
 | 
| +  uint32_t events = 0;
 | 
| +  __mxio_wait_end(mxio_, observed, &events);
 | 
| +  return events;
 | 
| +}
 | 
| +
 | 
| +
 | 
| +intptr_t IOHandle::ToggleEvents(intptr_t event_mask) {
 | 
| +  MutexLocker ml(mutex_);
 | 
| +  if (!write_events_enabled_) {
 | 
| +    LOG_INFO("IOHandle::ToggleEvents: fd = %ld de-asserting write\n", fd_);
 | 
| +    event_mask = event_mask & ~(1 << kOutEvent);
 | 
|    }
 | 
| -  if (!FDUtils::SetNonBlocking(interrupt_fds_[0])) {
 | 
| -    FATAL("Failed to set pipe fd non blocking\n");
 | 
| +  if ((event_mask & (1 << kOutEvent)) != 0) {
 | 
| +    LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting write and disabling\n",
 | 
| +             fd_);
 | 
| +    write_events_enabled_ = false;
 | 
|    }
 | 
| -  if (!FDUtils::SetCloseOnExec(interrupt_fds_[0])) {
 | 
| -    FATAL("Failed to set pipe fd close on exec\n");
 | 
| +  if (!read_events_enabled_) {
 | 
| +    LOG_INFO("IOHandle::ToggleEvents: fd=%ld de-asserting read\n", fd_);
 | 
| +    event_mask = event_mask & ~(1 << kInEvent);
 | 
|    }
 | 
| -  if (!FDUtils::SetCloseOnExec(interrupt_fds_[1])) {
 | 
| -    FATAL("Failed to set pipe fd close on exec\n");
 | 
| +  if ((event_mask & (1 << kInEvent)) != 0) {
 | 
| +    LOG_INFO("IOHandle::ToggleEvents: fd = %ld asserting read and disabling\n",
 | 
| +             fd_);
 | 
| +    read_events_enabled_ = false;
 | 
|    }
 | 
| -  shutdown_ = false;
 | 
| -  // The initial size passed to epoll_create is ignore on newer (>=
 | 
| -  // 2.6.8) Linux versions
 | 
| -  static const int kEpollInitialSize = 64;
 | 
| -  epoll_fd_ = NO_RETRY_EXPECTED(epoll_create(kEpollInitialSize));
 | 
| -  if (epoll_fd_ == -1) {
 | 
| -    FATAL1("Failed creating epoll file descriptor: %i", errno);
 | 
| -  }
 | 
| -  if (!FDUtils::SetCloseOnExec(epoll_fd_)) {
 | 
| -    FATAL("Failed to set epoll fd close on exec\n");
 | 
| -  }
 | 
| -  // Register the interrupt_fd with the epoll instance.
 | 
| -  struct epoll_event event;
 | 
| -  event.events = EPOLLIN;
 | 
| -  event.data.ptr = NULL;
 | 
| -  LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld\n", epoll_fd_);
 | 
| -  int status = NO_RETRY_EXPECTED(
 | 
| -      epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event));
 | 
| -  LOG_INFO("EventHandlerImplementation(): epoll_ctl: fd = %ld, status = %d\n",
 | 
| -           epoll_fd_, status);
 | 
| -  if (status == -1) {
 | 
| -    FATAL("Failed adding interrupt fd to epoll instance");
 | 
| +  return event_mask;
 | 
| +}
 | 
| +
 | 
| +
 | 
| +void EventHandlerImplementation::AddToPort(mx_handle_t port_handle,
 | 
| +                                           DescriptorInfo* di) {
 | 
| +  const uint32_t events = di->io_handle()->MaskToEpollEvents(di->Mask());
 | 
| +  const uint64_t key = reinterpret_cast<uint64_t>(di);
 | 
| +  if (!di->io_handle()->AsyncWait(port_handle, events, key)) {
 | 
| +    di->NotifyAllDartPorts(1 << kCloseEvent);
 | 
|    }
 | 
|  }
 | 
|  
 | 
|  
 | 
| +void EventHandlerImplementation::RemoveFromPort(mx_handle_t port_handle,
 | 
| +                                                DescriptorInfo* di) {
 | 
| +  const uint64_t key = reinterpret_cast<uint64_t>(di);
 | 
| +  di->io_handle()->CancelWait(port_handle, key);
 | 
| +}
 | 
| +
 | 
| +
 | 
| +EventHandlerImplementation::EventHandlerImplementation()
 | 
| +    : socket_map_(&HashMap::SamePointerValue, 16) {
 | 
| +  shutdown_ = false;
 | 
| +  // Create the port.
 | 
| +  port_handle_ = MX_HANDLE_INVALID;
 | 
| +  mx_status_t status = mx_port_create(MX_PORT_OPT_V2, &port_handle_);
 | 
| +  if (status != MX_OK) {
 | 
| +    // This is a FATAL because the VM won't work at all if we can't create this
 | 
| +    // port.
 | 
| +    FATAL1("mx_port_create failed: %s\n", mx_status_get_string(status));
 | 
| +  }
 | 
| +  ASSERT(port_handle_ != MX_HANDLE_INVALID);
 | 
| +}
 | 
| +
 | 
| +
 | 
|  static void DeleteDescriptorInfo(void* info) {
 | 
|    DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(info);
 | 
| +  LOG_INFO("Closed %ld\n", di->io_handle()->fd());
 | 
|    di->Close();
 | 
| -  LOG_INFO("Closed %d\n", di->fd());
 | 
|    delete di;
 | 
|  }
 | 
|  
 | 
|  
 | 
|  EventHandlerImplementation::~EventHandlerImplementation() {
 | 
|    socket_map_.Clear(DeleteDescriptorInfo);
 | 
| -  VOID_NO_RETRY_EXPECTED(close(epoll_fd_));
 | 
| -  VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[0]));
 | 
| -  VOID_NO_RETRY_EXPECTED(close(interrupt_fds_[1]));
 | 
| +  mx_handle_close(port_handle_);
 | 
| +  port_handle_ = MX_HANDLE_INVALID;
 | 
|  }
 | 
|  
 | 
|  
 | 
| -void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
 | 
| -                                                     DescriptorInfo* di) {
 | 
| -  intptr_t new_mask = di->Mask();
 | 
| -  LOG_INFO("UpdateEpollInstance: %d old=%ld, new=%ld\n", di->fd(), old_mask,
 | 
| -           new_mask);
 | 
| +void EventHandlerImplementation::UpdatePort(intptr_t old_mask,
 | 
| +                                            DescriptorInfo* di) {
 | 
| +  const intptr_t new_mask = di->Mask();
 | 
|    if ((old_mask != 0) && (new_mask == 0)) {
 | 
| -    RemoveFromEpollInstance(epoll_fd_, di);
 | 
| +    RemoveFromPort(port_handle_, di);
 | 
|    } else if ((old_mask == 0) && (new_mask != 0)) {
 | 
| -    AddToEpollInstance(epoll_fd_, di);
 | 
| -  } else if ((old_mask != 0) && (new_mask != 0) && (old_mask != new_mask)) {
 | 
| +    AddToPort(port_handle_, di);
 | 
| +  } else if ((old_mask != 0) && (new_mask != 0)) {
 | 
|      ASSERT(!di->IsListeningSocket());
 | 
| -    RemoveFromEpollInstance(epoll_fd_, di);
 | 
| -    AddToEpollInstance(epoll_fd_, di);
 | 
| +    RemoveFromPort(port_handle_, di);
 | 
| +    AddToPort(port_handle_, di);
 | 
|    }
 | 
|  }
 | 
|  
 | 
| @@ -198,9 +328,11 @@ void EventHandlerImplementation::UpdateEpollInstance(intptr_t old_mask,
 | 
|  DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
 | 
|      intptr_t fd,
 | 
|      bool is_listening) {
 | 
| -  ASSERT(fd >= 0);
 | 
| -  HashMap::Entry* entry = socket_map_.Lookup(GetHashmapKeyFromFd(fd),
 | 
| -                                             GetHashmapHashFromFd(fd), true);
 | 
| +  IOHandle* handle = reinterpret_cast<IOHandle*>(fd);
 | 
| +  ASSERT(handle->fd() >= 0);
 | 
| +  HashMap::Entry* entry =
 | 
| +      socket_map_.Lookup(GetHashmapKeyFromFd(handle->fd()),
 | 
| +                         GetHashmapHashFromFd(handle->fd()), true);
 | 
|    ASSERT(entry != NULL);
 | 
|    DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(entry->value);
 | 
|    if (di == NULL) {
 | 
| @@ -218,210 +350,153 @@ DescriptorInfo* EventHandlerImplementation::GetDescriptorInfo(
 | 
|  }
 | 
|  
 | 
|  
 | 
| -static ssize_t WriteToBlocking(int fd, const void* buffer, size_t count) {
 | 
| -  size_t remaining = count;
 | 
| -  char* buffer_pos = const_cast<char*>(reinterpret_cast<const char*>(buffer));
 | 
| -  while (remaining > 0) {
 | 
| -    ssize_t bytes_written = NO_RETRY_EXPECTED(write(fd, buffer_pos, remaining));
 | 
| -    if (bytes_written == 0) {
 | 
| -      return count - remaining;
 | 
| -    } else if (bytes_written == -1) {
 | 
| -      ASSERT(EAGAIN == EWOULDBLOCK);
 | 
| -      // Error code EWOULDBLOCK should only happen for non blocking
 | 
| -      // file descriptors.
 | 
| -      ASSERT(errno != EWOULDBLOCK);
 | 
| -      return -1;
 | 
| -    } else {
 | 
| -      ASSERT(bytes_written > 0);
 | 
| -      remaining -= bytes_written;
 | 
| -      buffer_pos += bytes_written;
 | 
| -    }
 | 
| -  }
 | 
| -  return count;
 | 
| -}
 | 
| -
 | 
| -
 | 
|  void EventHandlerImplementation::WakeupHandler(intptr_t id,
 | 
|                                                 Dart_Port dart_port,
 | 
|                                                 int64_t data) {
 | 
| -  InterruptMessage msg;
 | 
| -  msg.id = id;
 | 
| -  msg.dart_port = dart_port;
 | 
| -  msg.data = data;
 | 
| -  // WriteToBlocking will write up to 512 bytes atomically, and since our msg
 | 
| -  // is smaller than 512, we don't need a thread lock.
 | 
| -  // See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
 | 
| -  ASSERT(kInterruptMessageSize < PIPE_BUF);
 | 
| -  intptr_t result =
 | 
| -      WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
 | 
| -  if (result != kInterruptMessageSize) {
 | 
| -    if (result == -1) {
 | 
| -      perror("Interrupt message failure:");
 | 
| -    }
 | 
| -    FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
 | 
| +  COMPILE_ASSERT(sizeof(InterruptMessage) <= sizeof(mx_packet_user_t));
 | 
| +  mx_port_packet_t pkt;
 | 
| +  InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt.user);
 | 
| +  pkt.key = kInterruptPacketKey;
 | 
| +  msg->id = id;
 | 
| +  msg->dart_port = dart_port;
 | 
| +  msg->data = data;
 | 
| +  mx_status_t status =
 | 
| +      mx_port_queue(port_handle_, reinterpret_cast<void*>(&pkt), 0);
 | 
| +  if (status != MX_OK) {
 | 
| +    // This is a FATAL because the VM won't work at all if we can't send any
 | 
| +    // messages to the EventHandler thread.
 | 
| +    FATAL1("mx_port_queue failed: %s\n", mx_status_get_string(status));
 | 
|    }
 | 
|  }
 | 
|  
 | 
|  
 | 
| -void EventHandlerImplementation::HandleInterruptFd() {
 | 
| -  const intptr_t MAX_MESSAGES = kInterruptMessageSize;
 | 
| -  InterruptMessage msg[MAX_MESSAGES];
 | 
| -  ssize_t bytes = NO_RETRY_EXPECTED(
 | 
| -      read(interrupt_fds_[0], msg, MAX_MESSAGES * kInterruptMessageSize));
 | 
| -  LOG_INFO("HandleInterruptFd read %ld bytes\n", bytes);
 | 
| -  for (ssize_t i = 0; i < bytes / kInterruptMessageSize; i++) {
 | 
| -    if (msg[i].id == kTimerId) {
 | 
| -      LOG_INFO("HandleInterruptFd read timer update\n");
 | 
| -      timeout_queue_.UpdateTimeout(msg[i].dart_port, msg[i].data);
 | 
| -    } else if (msg[i].id == kShutdownId) {
 | 
| -      LOG_INFO("HandleInterruptFd read shutdown\n");
 | 
| -      shutdown_ = true;
 | 
| -    } else {
 | 
| -      ASSERT((msg[i].data & COMMAND_MASK) != 0);
 | 
| -      LOG_INFO("HandleInterruptFd command\n");
 | 
| -      Socket* socket = reinterpret_cast<Socket*>(msg[i].id);
 | 
| -      RefCntReleaseScope<Socket> rs(socket);
 | 
| -      if (socket->fd() == -1) {
 | 
| -        continue;
 | 
| +void EventHandlerImplementation::HandleInterrupt(InterruptMessage* msg) {
 | 
| +  if (msg->id == kTimerId) {
 | 
| +    LOG_INFO("HandleInterrupt read timer update\n");
 | 
| +    timeout_queue_.UpdateTimeout(msg->dart_port, msg->data);
 | 
| +    return;
 | 
| +  } else if (msg->id == kShutdownId) {
 | 
| +    LOG_INFO("HandleInterrupt read shutdown\n");
 | 
| +    shutdown_ = true;
 | 
| +    return;
 | 
| +  }
 | 
| +  ASSERT((msg->data & COMMAND_MASK) != 0);
 | 
| +  LOG_INFO("HandleInterrupt command:\n");
 | 
| +  Socket* socket = reinterpret_cast<Socket*>(msg->id);
 | 
| +  RefCntReleaseScope<Socket> rs(socket);
 | 
| +  if (socket->fd() == -1) {
 | 
| +    return;
 | 
| +  }
 | 
| +  IOHandle* io_handle = reinterpret_cast<IOHandle*>(socket->fd());
 | 
| +  const intptr_t fd = io_handle->fd();
 | 
| +  DescriptorInfo* di =
 | 
| +      GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg->data));
 | 
| +  ASSERT(io_handle == di->io_handle());
 | 
| +  if (IS_COMMAND(msg->data, kShutdownReadCommand)) {
 | 
| +    ASSERT(!di->IsListeningSocket());
 | 
| +    // Close the socket for reading.
 | 
| +    LOG_INFO("\tSHUT_RD: %ld\n", fd);
 | 
| +    VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_RD));
 | 
| +  } else if (IS_COMMAND(msg->data, kShutdownWriteCommand)) {
 | 
| +    ASSERT(!di->IsListeningSocket());
 | 
| +    // Close the socket for writing.
 | 
| +    LOG_INFO("\tSHUT_WR: %ld\n", fd);
 | 
| +    VOID_NO_RETRY_EXPECTED(shutdown(fd, SHUT_WR));
 | 
| +  } else if (IS_COMMAND(msg->data, kCloseCommand)) {
 | 
| +    // Close the socket and free system resources and move on to next
 | 
| +    // message.
 | 
| +    const intptr_t old_mask = di->Mask();
 | 
| +    Dart_Port port = msg->dart_port;
 | 
| +    di->RemovePort(port);
 | 
| +    const intptr_t new_mask = di->Mask();
 | 
| +    UpdatePort(old_mask, di);
 | 
| +
 | 
| +    LOG_INFO("\tCLOSE: %ld: %lx -> %lx\n", fd, old_mask, new_mask);
 | 
| +    if (di->IsListeningSocket()) {
 | 
| +      // We only close the socket file descriptor from the operating
 | 
| +      // system if there are no other dart socket objects which
 | 
| +      // are listening on the same (address, port) combination.
 | 
| +      ListeningSocketRegistry* registry = ListeningSocketRegistry::Instance();
 | 
| +
 | 
| +      MutexLocker locker(registry->mutex());
 | 
| +
 | 
| +      if (registry->CloseSafe(socket)) {
 | 
| +        ASSERT(new_mask == 0);
 | 
| +        socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
 | 
| +        di->Close();
 | 
| +        delete di;
 | 
| +        socket->SetClosedFd();
 | 
|        }
 | 
| -      DescriptorInfo* di =
 | 
| -          GetDescriptorInfo(socket->fd(), IS_LISTENING_SOCKET(msg[i].data));
 | 
| -      if (IS_COMMAND(msg[i].data, kShutdownReadCommand)) {
 | 
| -        ASSERT(!di->IsListeningSocket());
 | 
| -        // Close the socket for reading.
 | 
| -        LOG_INFO("\tSHUT_RD: %d\n", di->fd());
 | 
| -        VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_RD));
 | 
| -      } else if (IS_COMMAND(msg[i].data, kShutdownWriteCommand)) {
 | 
| -        ASSERT(!di->IsListeningSocket());
 | 
| -        // Close the socket for writing.
 | 
| -        LOG_INFO("\tSHUT_WR: %d\n", di->fd());
 | 
| -        VOID_NO_RETRY_EXPECTED(shutdown(di->fd(), SHUT_WR));
 | 
| -      } else if (IS_COMMAND(msg[i].data, kCloseCommand)) {
 | 
| -        // Close the socket and free system resources and move on to next
 | 
| -        // message.
 | 
| -        intptr_t old_mask = di->Mask();
 | 
| -        Dart_Port port = msg[i].dart_port;
 | 
| -        di->RemovePort(port);
 | 
| -        intptr_t new_mask = di->Mask();
 | 
| -        UpdateEpollInstance(old_mask, di);
 | 
| -
 | 
| -        LOG_INFO("\tCLOSE: %d: %lx -> %lx\n", di->fd(), old_mask, new_mask);
 | 
| -        intptr_t fd = di->fd();
 | 
| -        if (di->IsListeningSocket()) {
 | 
| -          // We only close the socket file descriptor from the operating
 | 
| -          // system if there are no other dart socket objects which
 | 
| -          // are listening on the same (address, port) combination.
 | 
| -          ListeningSocketRegistry* registry =
 | 
| -              ListeningSocketRegistry::Instance();
 | 
| -
 | 
| -          MutexLocker locker(registry->mutex());
 | 
| -
 | 
| -          if (registry->CloseSafe(socket)) {
 | 
| -            ASSERT(new_mask == 0);
 | 
| -            socket_map_.Remove(GetHashmapKeyFromFd(fd),
 | 
| -                               GetHashmapHashFromFd(fd));
 | 
| -            di->Close();
 | 
| -            LOG_INFO("Closed %d\n", di->fd());
 | 
| -            delete di;
 | 
| -            socket->SetClosedFd();
 | 
| -          }
 | 
| -        } else {
 | 
| -          ASSERT(new_mask == 0);
 | 
| -          socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
 | 
| -          di->Close();
 | 
| -          LOG_INFO("Closed %d\n", di->fd());
 | 
| -          delete di;
 | 
| -          socket->SetClosedFd();
 | 
| -        }
 | 
| -
 | 
| -        bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
 | 
| -        if (!success) {
 | 
| -          LOG_ERR("Failed to post destroy event to port %ld", port);
 | 
| -        }
 | 
| -      } else if (IS_COMMAND(msg[i].data, kReturnTokenCommand)) {
 | 
| -        int count = TOKEN_COUNT(msg[i].data);
 | 
| -        intptr_t old_mask = di->Mask();
 | 
| -        LOG_INFO("\t Return Token: %d: %lx\n", di->fd(), old_mask);
 | 
| -        di->ReturnTokens(msg[i].dart_port, count);
 | 
| -        UpdateEpollInstance(old_mask, di);
 | 
| -      } else if (IS_COMMAND(msg[i].data, kSetEventMaskCommand)) {
 | 
| -        // `events` can only have kInEvent/kOutEvent flags set.
 | 
| -        intptr_t events = msg[i].data & EVENT_MASK;
 | 
| -        ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
 | 
| -
 | 
| -        intptr_t old_mask = di->Mask();
 | 
| -        LOG_INFO("\t Set Event Mask: %d: %lx %lx\n", di->fd(), old_mask,
 | 
| -                 msg[i].data & EVENT_MASK);
 | 
| -        di->SetPortAndMask(msg[i].dart_port, msg[i].data & EVENT_MASK);
 | 
| -        UpdateEpollInstance(old_mask, di);
 | 
| -      } else {
 | 
| -        UNREACHABLE();
 | 
| +    } else {
 | 
| +      ASSERT(new_mask == 0);
 | 
| +      socket_map_.Remove(GetHashmapKeyFromFd(fd), GetHashmapHashFromFd(fd));
 | 
| +      di->Close();
 | 
| +      delete di;
 | 
| +      socket->SetClosedFd();
 | 
| +    }
 | 
| +    if (port != 0) {
 | 
| +      const bool success = DartUtils::PostInt32(port, 1 << kDestroyedEvent);
 | 
| +      if (!success) {
 | 
| +        LOG_ERR("Failed to post destroy event to port %ld\n", port);
 | 
|        }
 | 
|      }
 | 
| +  } else if (IS_COMMAND(msg->data, kReturnTokenCommand)) {
 | 
| +    const int count = TOKEN_COUNT(msg->data);
 | 
| +    const intptr_t old_mask = di->Mask();
 | 
| +    LOG_INFO("\t Return Token: %ld: %lx\n", fd, old_mask);
 | 
| +    di->ReturnTokens(msg->dart_port, count);
 | 
| +    UpdatePort(old_mask, di);
 | 
| +  } else if (IS_COMMAND(msg->data, kSetEventMaskCommand)) {
 | 
| +    // `events` can only have kInEvent/kOutEvent flags set.
 | 
| +    const intptr_t events = msg->data & EVENT_MASK;
 | 
| +    ASSERT(0 == (events & ~(1 << kInEvent | 1 << kOutEvent)));
 | 
| +
 | 
| +    const intptr_t old_mask = di->Mask();
 | 
| +    LOG_INFO("\t Set Event Mask: %ld: %lx %lx\n", fd, old_mask,
 | 
| +             msg->data & EVENT_MASK);
 | 
| +    di->SetPortAndMask(msg->dart_port, msg->data & EVENT_MASK);
 | 
| +    UpdatePort(old_mask, di);
 | 
| +  } else {
 | 
| +    UNREACHABLE();
 | 
|    }
 | 
| -  LOG_INFO("HandleInterruptFd exit\n");
 | 
|  }
 | 
|  
 | 
|  
 | 
| -intptr_t EventHandlerImplementation::GetPollEvents(intptr_t events,
 | 
| -                                                   DescriptorInfo* di) {
 | 
| -#ifdef EVENTHANDLER_LOGGING
 | 
| -  PrintEventMask(di->fd(), events);
 | 
| -#endif
 | 
| -  if ((events & EPOLLERR) != 0) {
 | 
| -    // Return error only if EPOLLIN is present.
 | 
| -    return ((events & EPOLLIN) != 0) ? (1 << kErrorEvent) : 0;
 | 
| -  }
 | 
| -  intptr_t event_mask = 0;
 | 
| -  if ((events & EPOLLIN) != 0) {
 | 
| -    event_mask |= (1 << kInEvent);
 | 
| -  }
 | 
| -  if ((events & EPOLLOUT) != 0) {
 | 
| -    event_mask |= (1 << kOutEvent);
 | 
| -  }
 | 
| -  if ((events & (EPOLLHUP | EPOLLRDHUP)) != 0) {
 | 
| -    event_mask |= (1 << kCloseEvent);
 | 
| -  }
 | 
| -  return event_mask;
 | 
| -}
 | 
| -
 | 
| -
 | 
| -void EventHandlerImplementation::HandleEvents(struct epoll_event* events,
 | 
| -                                              int size) {
 | 
| -  bool interrupt_seen = false;
 | 
| -  for (int i = 0; i < size; i++) {
 | 
| -    if (events[i].data.ptr == NULL) {
 | 
| -      interrupt_seen = true;
 | 
| -    } else {
 | 
| -      DescriptorInfo* di =
 | 
| -          reinterpret_cast<DescriptorInfo*>(events[i].data.ptr);
 | 
| -      const intptr_t old_mask = di->Mask();
 | 
| -      const intptr_t event_mask = GetPollEvents(events[i].events, di);
 | 
| -      LOG_INFO("HandleEvents: fd=%ld events=%ld\n", di->fd(), event_mask);
 | 
| -      if ((event_mask & (1 << kErrorEvent)) != 0) {
 | 
| -        di->NotifyAllDartPorts(event_mask);
 | 
| -        UpdateEpollInstance(old_mask, di);
 | 
| -      } else if (event_mask != 0) {
 | 
| -        Dart_Port port = di->NextNotifyDartPort(event_mask);
 | 
| -        ASSERT(port != 0);
 | 
| -        UpdateEpollInstance(old_mask, di);
 | 
| -        LOG_INFO("HandleEvents: Posting %ld to %ld for fd=%ld\n", event_mask,
 | 
| -                 port, di->fd());
 | 
| -        bool success = DartUtils::PostInt32(port, event_mask);
 | 
| -        if (!success) {
 | 
| -          // This can happen if e.g. the isolate that owns the port has died
 | 
| -          // for some reason.
 | 
| -          LOG_ERR("Failed to post event for fd %ld to port %ld", di->fd(),
 | 
| -                  port);
 | 
| -        }
 | 
| +void EventHandlerImplementation::HandlePacket(mx_port_packet_t* pkt) {
 | 
| +  LOG_INFO("HandlePacket: Got event packet: key=%lx\n", pkt->key);
 | 
| +  LOG_INFO("HandlePacket: Got event packet: type=%lx\n", pkt->type);
 | 
| +  LOG_INFO("HandlePacket: Got event packet: status=%ld\n", pkt->status);
 | 
| +  if (pkt->type == MX_PKT_TYPE_USER) {
 | 
| +    ASSERT(pkt->key == kInterruptPacketKey);
 | 
| +    InterruptMessage* msg = reinterpret_cast<InterruptMessage*>(&pkt->user);
 | 
| +    HandleInterrupt(msg);
 | 
| +    return;
 | 
| +  }
 | 
| +  LOG_INFO("HandlePacket: Got event packet: observed = %lx\n",
 | 
| +           pkt->signal.observed);
 | 
| +  LOG_INFO("HandlePacket: Got event packet: count = %ld\n", pkt->signal.count);
 | 
| +
 | 
| +  DescriptorInfo* di = reinterpret_cast<DescriptorInfo*>(pkt->key);
 | 
| +  mx_signals_t observed = pkt->signal.observed;
 | 
| +  const intptr_t old_mask = di->Mask();
 | 
| +  const uint32_t epoll_event = di->io_handle()->WaitEnd(observed);
 | 
| +  intptr_t event_mask = IOHandle::EpollEventsToMask(epoll_event);
 | 
| +  if ((event_mask & (1 << kErrorEvent)) != 0) {
 | 
| +    di->NotifyAllDartPorts(event_mask);
 | 
| +  } else if (event_mask != 0) {
 | 
| +    event_mask = di->io_handle()->ToggleEvents(event_mask);
 | 
| +    if (event_mask != 0) {
 | 
| +      Dart_Port port = di->NextNotifyDartPort(event_mask);
 | 
| +      ASSERT(port != 0);
 | 
| +      bool success = DartUtils::PostInt32(port, event_mask);
 | 
| +      if (!success) {
 | 
| +        // This can happen if e.g. the isolate that owns the port has died
 | 
| +        // for some reason.
 | 
| +        LOG_ERR("Failed to post event to port %ld\n", port);
 | 
|        }
 | 
|      }
 | 
|    }
 | 
| -  if (interrupt_seen) {
 | 
| -    // Handle after socket events, so we avoid closing a socket before we handle
 | 
| -    // the current events.
 | 
| -    HandleInterruptFd();
 | 
| -  }
 | 
| +  UpdatePort(old_mask, di);
 | 
|  }
 | 
|  
 | 
|  
 | 
| @@ -448,31 +523,28 @@ void EventHandlerImplementation::HandleTimeout() {
 | 
|  
 | 
|  
 | 
|  void EventHandlerImplementation::Poll(uword args) {
 | 
| -  static const intptr_t kMaxEvents = 16;
 | 
| -  struct epoll_event events[kMaxEvents];
 | 
|    EventHandler* handler = reinterpret_cast<EventHandler*>(args);
 | 
|    EventHandlerImplementation* handler_impl = &handler->delegate_;
 | 
|    ASSERT(handler_impl != NULL);
 | 
|  
 | 
| +  mx_port_packet_t pkt;
 | 
|    while (!handler_impl->shutdown_) {
 | 
|      int64_t millis = handler_impl->GetTimeout();
 | 
|      ASSERT((millis == kInfinityTimeout) || (millis >= 0));
 | 
| -    // TODO(US-109): When the epoll implementation is properly edge-triggered,
 | 
| -    // remove this sleep, which prevents the message queue from being
 | 
| -    // overwhelmed and leading to memory exhaustion.
 | 
| -    usleep(5000);
 | 
| -    LOG_INFO("epoll_wait(millis = %ld)\n", millis);
 | 
| -    intptr_t result = NO_RETRY_EXPECTED(
 | 
| -        epoll_wait(handler_impl->epoll_fd_, events, kMaxEvents, millis));
 | 
| -    ASSERT(EAGAIN == EWOULDBLOCK);
 | 
| -    LOG_INFO("epoll_wait(millis = %ld) -> %ld\n", millis, result);
 | 
| -    if (result < 0) {
 | 
| -      if (errno != EWOULDBLOCK) {
 | 
| -        perror("Poll failed");
 | 
| -      }
 | 
| +
 | 
| +    LOG_INFO("mx_port_wait(millis = %ld)\n", millis);
 | 
| +    mx_status_t status = mx_port_wait(handler_impl->port_handle_,
 | 
| +                                      millis == kInfinityTimeout
 | 
| +                                          ? MX_TIME_INFINITE
 | 
| +                                          : mx_deadline_after(MX_MSEC(millis)),
 | 
| +                                      reinterpret_cast<void*>(&pkt), 0);
 | 
| +    if (status == MX_ERR_TIMED_OUT) {
 | 
| +      handler_impl->HandleTimeout();
 | 
| +    } else if (status != MX_OK) {
 | 
| +      FATAL1("mx_port_wait failed: %s\n", mx_status_get_string(status));
 | 
|      } else {
 | 
|        handler_impl->HandleTimeout();
 | 
| -      handler_impl->HandleEvents(events, result);
 | 
| +      handler_impl->HandlePacket(&pkt);
 | 
|      }
 | 
|    }
 | 
|    DEBUG_ASSERT(ReferenceCounted<Socket>::instances() == 0);
 | 
| 
 |