Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1309)

Unified Diff: mojo/edk/system/data_pipe.h

Issue 1526923006: [mojo] Implement data pipe using a shared buffer. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: mojo/edk/system/data_pipe.h
diff --git a/mojo/edk/system/data_pipe.h b/mojo/edk/system/data_pipe.h
index d2b90bf1ab6a138f26dd358d7d03c7ace4f3bf94..ba1345ba704b6f73a40713a0fc3c011299719471 100644
--- a/mojo/edk/system/data_pipe.h
+++ b/mojo/edk/system/data_pipe.h
@@ -6,9 +6,13 @@
#define MOJO_EDK_SYSTEM_DATA_PIPE_H_
#include <stddef.h>
+#include <vector>
+#include "base/callback_forward.h"
#include "base/compiler_specific.h"
+#include "base/memory/ref_counted.h"
#include "mojo/edk/embedder/platform_handle_vector.h"
+#include "mojo/edk/embedder/platform_shared_buffer.h"
#include "mojo/edk/embedder/scoped_platform_handle.h"
#include "mojo/edk/system/system_impl_export.h"
#include "mojo/public/c/system/data_pipe.h"
@@ -19,9 +23,24 @@ namespace mojo {
namespace edk {
class RawChannel;
+enum DataPipeCommand {
Anand Mistry (off Chromium) 2016/01/08 03:28:02 : uint32_t That way, you can just use this type in
Eliot Courtney 2016/01/08 04:44:39 Done.
+ PLEASE_CREATE_SHARED_BUFFER,
Anand Mistry (off Chromium) 2016/01/08 03:28:02 With the broker, you no longer need to do this. It
Eliot Courtney 2016/01/08 04:44:39 Whoops, I already did this but forgot to upload it
+ NOTIFY_SHARED_BUFFER,
+ DATA_WRITTEN,
+ DATA_READ
+};
+
+struct DataPipeCommandHeader {
+ uint32_t command;
+ size_t num_bytes;
Anand Mistry (off Chromium) 2016/01/08 03:28:02 Never use size_t in a serialised message.
Eliot Courtney 2016/01/08 04:44:39 Done.
+};
+
// Shared code between DataPipeConsumerDispatcher and
-// DataPipeProducerDispatcher.
-class MOJO_SYSTEM_IMPL_EXPORT DataPipe {
+// DataPipeProducerDispatcher. This class is not thread safe -- all locking must
+// be performed by the caller. This class is only intended to be used as a
+// writer or a reader, not both.
+class MOJO_SYSTEM_IMPL_EXPORT DataPipe final
+ : public base::RefCountedThreadSafe<DataPipe> {
public:
// The default options for |MojoCreateDataPipe()|. (Real uses should obtain
// this via |ValidateCreateOptions()| with a null |in_options|; this is
@@ -37,26 +56,119 @@ class MOJO_SYSTEM_IMPL_EXPORT DataPipe {
const MojoCreateDataPipeOptions* in_options,
MojoCreateDataPipeOptions* out_options);
+ explicit DataPipe(const MojoCreateDataPipeOptions& options);
+
+ void Init(ScopedPlatformHandle message_pipe,
+ char* serialized_write_buffer,
+ size_t serialized_write_buffer_size,
+ char* serialized_read_buffer,
+ size_t serialized_read_buffer_size,
+ ScopedPlatformHandle shared_buffer_handle,
+ size_t ring_buffer_start,
+ size_t ring_buffer_size,
+ bool is_producer,
+ const base::Closure& init_callback);
+
// Helper methods used by DataPipeConsumerDispatcher and
// DataPipeProducerDispatcher for serialization and deserialization.
- static void StartSerialize(bool have_channel_handle,
- bool have_shared_memory,
- size_t* max_size,
- size_t* max_platform_handles);
- static void EndSerialize(const MojoCreateDataPipeOptions& options,
- ScopedPlatformHandle channel_handle,
- ScopedPlatformHandle shared_memory_handle,
- size_t shared_memory_size,
- void* destination,
- size_t* actual_size,
- PlatformHandleVector* platform_handles);
+ void StartSerialize(size_t* max_size, size_t* max_platform_handles);
+ void EndSerialize(void* destination,
+ size_t* actual_size,
+ PlatformHandleVector* platform_handles);
static ScopedPlatformHandle Deserialize(
const void* source,
size_t size,
PlatformHandleVector* platform_handles,
MojoCreateDataPipeOptions* options,
- ScopedPlatformHandle* shared_memory_handle,
- size_t* shared_memory_size);
+ ScopedPlatformHandle* channel_shared_handle,
+ size_t* serialized_read_buffer_size,
+ size_t* serialized_write_buffer_size,
+ ScopedPlatformHandle* shared_buffer_handle,
+ size_t* ring_buffer_start,
+ size_t* ring_buffer_size);
+
+ // Returns the number of readable or writable bytes. If there is no valid
+ // shared buffer, returns 0.
+ size_t GetReadableBytes() const;
+ size_t GetWritableBytes() const;
+
+ // Two phase reads and writes. Note that since returned memory has to be
+ // contiguous, these may return less than the values returned by
+ // |GetReadableBytes| / |GetWritableBytes|.
+ void* GetWriteBuffer(size_t* num_bytes);
+ const void* GetReadBuffer(size_t* num_bytes);
+
+ // Normal reads and writes. Will return false if they can't read or write
+ // all the data given.
+ bool WriteDataIntoSharedBuffer(const void* data, size_t num_bytes);
+ bool ReadDataFromSharedBuffer(void* buf, size_t num_bytes);
+
+ // Notify other end of a read or write. Returns false if the other side was
+ // closed (on a RawChannel error).
+ bool NotifyWrite(size_t num_bytes);
+ bool NotifyRead(size_t num_bytes);
+
+ // Sends the other side our shared buffer.
+ void NotifySharedBuffer();
+
+ // Asks the other side to give us a shared buffer.
+ void RequestSharedBuffer();
+
+ // Set our shared buffer.
+ void UpdateSharedBuffer(scoped_refptr<PlatformSharedBuffer> shared_buffer);
+
+ // Reading and writing are split into three parts: 1. reading / writing the
+ // data, telling the other side what we did, and updating the ring buffer to
+ // reflect what we did. The functions |UpdateFromRead| and |UpdateFromWrite|
+ // update the ring buffer state.
+ void UpdateFromRead(size_t num_bytes);
+ void UpdateFromWrite(size_t num_bytes);
+
+ // Releases the raw channel.
+ void Serialize();
+
+ RawChannel* GetChannel() { return channel_; }
+ const MojoCreateDataPipeOptions& GetOptions() { return options_; }
+
+ // Returns whether readable or writable state changed.
+ bool ProcessCommand(const DataPipeCommandHeader& command,
+ ScopedPlatformHandleVectorPtr platform_handles);
+
+ // Shuts down the channel, and releases the shared buffer.
+ void Shutdown();
+
+ // Named after |CreateEquivalentDispatcherAndCloseImplNoLock|. This will
+ // Release() the RawChannel and save its returned read and write buffer, then
+ // swap ourselves into |out|.
+ void CreateEquivalentAndClose(DataPipe* out);
+
+ private:
+ friend class base::RefCountedThreadSafe<DataPipe>;
+ ~DataPipe();
+
+ void* GetSharedBufferBase();
Anand Mistry (off Chromium) 2016/01/08 03:28:02 Make this a uint8_t*. That way, you can avoid all
Eliot Courtney 2016/01/08 04:44:39 Done.
+
+ RawChannel* channel_;
+
+ MojoCreateDataPipeOptions options_;
+
+ // True if we've released the channel because we're about to get serialised
+ // and transported. We also store the channel's read and write buffer, and
+ // handle.
+ bool channel_released_;
+ std::vector<char> serialized_read_buffer_;
+ std::vector<char> serialized_write_buffer_;
+ ScopedPlatformHandle serialized_channel_handle_;
+
+ scoped_refptr<PlatformSharedBuffer> shared_buffer_;
+ // Keep the full mapping of the shared buffer around so we don't have to
+ // recreate it.
+ scoped_ptr<PlatformSharedBufferMapping> mapping_;
+ size_t ring_buffer_start_;
+ size_t ring_buffer_size_;
+
+ // By default, the producer will attempt to create the shared buffer.
+ bool is_producer_;
};
} // namespace edk

Powered by Google App Engine
This is Rietveld 408576698