emugl: Simplify RenderChannel interface.

This patch simplifies the RenderChannel interface and its
implementation (a.k.a. RenderChannelImpl). This has several
benefits:

- The interface is now much closer to the one expected by
  an AndroidPipe instance, making EmuglPipe easier to implement
  (e.g. get rid of atomic variables in it).

- RenderChannelImpl only uses a single lock instead of three,
  and doesn't use atomic variables anymore, thanks to the
  BufferQueue class. This makes the code a lot less confusing
  and removes risks of thread-racy behaviour due to the use
  of multiple locks and atomics at the same time.

Performance wise, this seems to be comparable to the current
implementation. The following numbers corresponds to the
frames/s collected when running the Antutu3D 6.0 "Garden" benchmark,
on an Android API level 23/x86 system image, 1920x1080 resultion,
running on an HPz620 (32 cores at 2.7 GHz) with the "performance"
scaling governor set on all CPUs:

  Official     Current
  release      Tip-of-tree    With this patch

    15.07         17.13          17.52
    15.35         17.36          17.52
    15.06         17.34          17.37

Change-Id: I1f0e998b23c38051c8de519a3cab3f6a961be930
diff --git a/android/opengl/OpenglEsPipe.cpp b/android/opengl/OpenglEsPipe.cpp
index 3b3f27f..9e0491c 100644
--- a/android/opengl/OpenglEsPipe.cpp
+++ b/android/opengl/OpenglEsPipe.cpp
@@ -22,7 +22,7 @@
 #include <string.h>
 
 // Set to 1 or 2 for debug traces
-//#define DEBUG 1
+#define DEBUG 0
 
 #if DEBUG >= 1
 #define D(...) printf(__VA_ARGS__), printf("\n"), fflush(stdout)
@@ -36,9 +36,11 @@
 #define DD(...) ((void)0)
 #endif
 
+using ChannelBuffer = emugl::RenderChannel::Buffer;
 using emugl::RenderChannel;
 using emugl::RenderChannelPtr;
 using ChannelState = emugl::RenderChannel::State;
+using IoResult = emugl::RenderChannel::IoResult;
 
 namespace android {
 namespace opengl {
@@ -78,8 +80,7 @@
     /////////////////////////////////////////////////////////////////////////
     // Constructor, check that |mIsWorking| is true after this call to verify
     // that everything went well.
-    EmuglPipe(void* hwPipe,
-              Service* service,
+    EmuglPipe(void* hwPipe, Service* service,
               const emugl::RendererPtr& renderer)
         : AndroidPipe(hwPipe, service) {
         mChannel = renderer->createRenderChannel();
@@ -89,10 +90,10 @@
         }
 
         mIsWorking = true;
-        mChannel->setEventCallback([this](ChannelState state,
-                                          RenderChannel::EventSource source) {
-            this->onChannelIoEvent(state);
-        });
+        mChannel->setEventCallback(
+                [this](RenderChannel::State events) {
+                    this->onChannelHostEvent(events);
+                });
     }
 
     //////////////////////////////////////////////////////////////////////////
@@ -102,20 +103,27 @@
         D("%s", __func__);
         mIsWorking = false;
         mChannel->stop();
-        // stop callback will call close() which deletes the pipe
+        delete this;
     }
 
     virtual unsigned onGuestPoll() const override {
         DD("%s", __func__);
 
         unsigned ret = 0;
-        ChannelState state = mChannel->currentState();
-        if (canReadAny(state)) {
+        if (mDataForReadingLeft > 0) {
             ret |= PIPE_POLL_IN;
         }
-        if (canWrite(state)) {
+        ChannelState state = mChannel->state();
+        if ((state & ChannelState::CanRead) != 0) {
+            ret |= PIPE_POLL_IN;
+        }
+        if ((state & ChannelState::CanWrite) != 0) {
             ret |= PIPE_POLL_OUT;
         }
+        if ((state & ChannelState::Stopped) != 0) {
+            ret |= PIPE_POLL_HUP;
+        }
+        DD("%s: returning %d", __func__, ret);
         return ret;
     }
 
@@ -123,8 +131,8 @@
             override {
         DD("%s", __func__);
 
-        // Consume the pipe's dataForReading, then put the next received data piece
-        // there. Repeat until the buffers are full or we're out of data
+        // Consume the pipe's dataForReading, then put the next received data
+        // piece there. Repeat until the buffers are full or we're out of data
         // in the channel.
         int len = 0;
         size_t buffOffset = 0;
@@ -134,29 +142,37 @@
         while (buff != buffEnd) {
             if (mDataForReadingLeft == 0) {
                 // No data left, read a new chunk from the channel.
-                //
-                // Spin a little bit here: many GL calls are much faster than
-                // the whole host-to-guest-to-host transition.
                 int spinCount = 20;
-                while (!mChannel->read(&mDataForReading,
-                                    RenderChannel::CallType::Nonblocking)) {
-                    if (mChannel->isStopped()) {
-                        return PIPE_ERROR_IO;
-                    } else if (len == 0) {
-                        if (canRead(mChannel->currentState()) || --spinCount > 0) {
-                            continue;
-                        }
-                        return PIPE_ERROR_AGAIN;
-                    } else {
+                for (;;) {
+                    auto result = mChannel->tryRead(&mDataForReading);
+                    if (result == IoResult::Ok) {
+                        mDataForReadingLeft = mDataForReading.size();
+                        break;
+                    }
+                    DD("%s: tryRead() failed with %d", __func__, (int)result);
+                    // This failed either because the channel was stopped
+                    // from the host, or if there was no data yet in the
+                    // channel.
+                    if (len > 0) {
+                        DD("%s: returning %d bytes", __func__, (int)len);
                         return len;
                     }
+                    if (result == IoResult::Error) {
+                        return PIPE_ERROR_IO;
+                    }
+                    // Spin a little before declaring there is nothing
+                    // to read. Many GL calls are much faster than the
+                    // whole host-to-guest-to-host transition.
+                    if (--spinCount > 0) {
+                        continue;
+                    }
+                    DD("%s: returning PIPE_ERROR_AGAIN", __func__);
+                    return PIPE_ERROR_AGAIN;
                 }
-
-                mDataForReadingLeft = mDataForReading.size();
             }
 
             const size_t curSize =
-                    std::min(buff->size - buffOffset, mDataForReadingLeft.load());
+                    std::min(buff->size - buffOffset, mDataForReadingLeft);
             memcpy(buff->data + buffOffset,
                 mDataForReading.data() +
                         (mDataForReading.size() - mDataForReadingLeft),
@@ -171,6 +187,7 @@
             }
         }
 
+        DD("%s: received %d bytes", __func__, (int)len);
         return len;
     }
 
@@ -178,8 +195,9 @@
                             int numBuffers) override {
         DD("%s", __func__);
 
-        if (int ret = sendReadyStatus()) {
-            return ret;
+        if (!mIsWorking) {
+            DD("%s: pipe already closed!", __func__);
+            return PIPE_ERROR_IO;
         }
 
         // Count the total bytes to send.
@@ -188,8 +206,8 @@
             count += buffers[n].size;
         }
 
-        // Copy everything into a single RenderChannel::Buffer.
-        RenderChannel::Buffer outBuffer;
+        // Copy everything into a single ChannelBuffer.
+        ChannelBuffer outBuffer;
         outBuffer.resize_noinit(count);
         auto ptr = outBuffer.data();
         for (int n = 0; n < numBuffers; ++n) {
@@ -197,9 +215,12 @@
             ptr += buffers[n].size;
         }
 
+        D("%s: sending %d bytes to host", __func__, count);
         // Send it through the channel.
-        if (!mChannel->write(std::move(outBuffer))) {
-            return PIPE_ERROR_IO;
+        auto result = mChannel->tryWrite(std::move(outBuffer));
+        if (result != IoResult::Ok) {
+            D("%s: tryWrite() failed with %d", __func__, (int)result);
+            return result == IoResult::Error ? PIPE_ERROR_IO : PIPE_ERROR_AGAIN;
         }
 
         return count;
@@ -208,112 +229,66 @@
     virtual void onGuestWantWakeOn(int flags) override {
         DD("%s: flags=%d", __func__, flags);
 
-        // We reset these flags when we actually wake the pipe, so only
-        // add to them here.
-        mCareAboutRead |= (flags & PIPE_WAKE_READ) != 0;
-        mCareAboutWrite |= (flags & PIPE_WAKE_WRITE) != 0;
+        // Translate |flags| into ChannelState flags.
+        ChannelState wanted = ChannelState::Empty;
+        if (flags & PIPE_WAKE_READ) {
+            wanted |= ChannelState::CanRead;
+        }
+        if (flags & PIPE_WAKE_WRITE) {
+            wanted |= ChannelState::CanWrite;
+        }
 
-        ChannelState state = mChannel->currentState();
-        processIoEvents(state);
+        // Signal events that are already available now.
+        ChannelState state = mChannel->state();
+        ChannelState available = state & wanted;
+        DD("%s: state=%d wanted=%d available=%d", __func__, (int)state,
+           (int)wanted, (int)available);
+        if (available != ChannelState::Empty) {
+            DD("%s: signaling events %d", __func__, (int)available);
+            signalState(available);
+            wanted &= ~available;
+        }
+
+        // Ask the channel to be notified of remaining events.
+        if (wanted != ChannelState::Empty) {
+            DD("%s: waiting for events %d", __func__, (int)wanted);
+            mChannel->setWantedEvents(wanted);
+        }
     }
 
 private:
-    // Returns true iff there is data to read from the emugl channel.
-    static bool canRead(ChannelState state) {
-        return (state & ChannelState::CanRead) != ChannelState::Empty;
-    }
-
-    // Returns true iff the emugl channel can accept data.
-    static bool canWrite(ChannelState state) {
-        return (state & ChannelState::CanWrite) != ChannelState::Empty;
-    }
-
-    // Returns true iff there is data to read from the local cache or from
-    // the emugl channel.
-    bool canReadAny(ChannelState state) const {
-        return mDataForReadingLeft != 0 || canRead(state);
-    }
-
-    // Check that the pipe is working and that the render channel can be
-    // written to. Return 0 in case of success, and one PIPE_ERROR_XXX
-    // value on error.
-    int sendReadyStatus() const {
-        if (mIsWorking) {
-            ChannelState state = mChannel->currentState();
-            if (canWrite(state)) {
-                return 0;
-            }
-            return PIPE_ERROR_AGAIN;
-        } else if (!mHwPipe) {
-            return PIPE_ERROR_IO;
-        } else {
-            return PIPE_ERROR_INVAL;
-        }
-    }
-
-    // Check the read/write state of the render channel and signal the pipe
-    // if any condition meets mCareAboutRead or mCareAboutWrite.
-    void processIoEvents(ChannelState state) {
+    // Called to signal the guest that read/write wake events occured.
+    // Note: this can be called from either the guest or host render
+    // thread.
+    void signalState(ChannelState state) {
         int wakeFlags = 0;
-
-        if (mCareAboutRead && canReadAny(state)) {
+        if ((state & ChannelState::CanRead) != 0) {
             wakeFlags |= PIPE_WAKE_READ;
-            mCareAboutRead = false;
         }
-        if (mCareAboutWrite && canWrite(state)) {
+        if ((state & ChannelState::CanWrite) != 0) {
             wakeFlags |= PIPE_WAKE_WRITE;
-            mCareAboutWrite = false;
         }
-
-        // Send wake signal to the guest if needed.
         if (wakeFlags != 0) {
-            signalWake(wakeFlags);
+            this->signalWake(wakeFlags);
         }
     }
 
     // Called when an i/o event occurs on the render channel
-    void onChannelIoEvent(ChannelState state) {
-        D("%s: %d", __func__, (int)state);
-
-        if ((state & ChannelState::Stopped) != ChannelState::Empty) {
-            close();
-        } else {
-            processIoEvents(state);
-        }
-    }
-
-    // Close the pipe, this may be called from the host or guest side.
-    void close() {
-        D("%s", __func__);
-
-        // If the pipe isn't in a working state, delete immediately.
-        if (!mIsWorking) {
-            mChannel->stop();
-            delete this;
+    void onChannelHostEvent(ChannelState state) {
+        D("%s: events %d", __func__, (int)state);
+        // NOTE: This is called from the host-side render thread.
+        // but closeFromHost() and signalWake() can be called from
+        // any thread.
+        if ((state & ChannelState::Stopped) != 0) {
+            this->closeFromHost();
             return;
         }
-
-        // Force the closure of the channel - if a guest is blocked
-        // waiting for a wake signal, it will receive an error.
-        if (mHwPipe) {
-            closeFromHost();
-            mHwPipe = nullptr;
-        }
-
-        mIsWorking = false;
+        signalState(state);
     }
 
     // A RenderChannel pointer used for communication.
     RenderChannelPtr mChannel;
 
-    // Guest state tracking - if it requested us to wake on read/write
-    // availability. If guest doesn't care about some operation type, we should
-    // not wake it when that operation becomes available.
-    // Note: we need an atomic or operation, and atomic<bool> doesn't have it -
-    //  that's why it is atomic<char> here.
-    std::atomic<char> mCareAboutRead {false};
-    std::atomic<char> mCareAboutWrite {false};
-
     // Set to |true| if the pipe is in working state, |false| means we're not
     // initialized or the pipe is closed.
     bool mIsWorking = false;
@@ -324,8 +299,8 @@
     // guest-supplied memory.
     // If guest didn't have enough room for the whole buffer, we track the
     // number of remaining bytes in |mDataForReadingLeft| for the next read().
-    RenderChannel::Buffer mDataForReading;
-    std::atomic<size_t> mDataForReadingLeft { 0 };
+    ChannelBuffer mDataForReading;
+    size_t mDataForReadingLeft = 0;
 
     DISALLOW_COPY_ASSIGN_AND_MOVE(EmuglPipe);
 };
diff --git a/distrib/android-emugl/host/include/OpenglRender/IOStream.h b/distrib/android-emugl/host/include/OpenglRender/IOStream.h
index a0df579..46b8c6a 100644
--- a/distrib/android-emugl/host/include/OpenglRender/IOStream.h
+++ b/distrib/android-emugl/host/include/OpenglRender/IOStream.h
@@ -33,7 +33,7 @@
 
 public:
     size_t read(void* buf, size_t bufLen) {
-        if (!read(buf, &bufLen)) {
+        if (!readRaw(buf, &bufLen)) {
             return 0;
         }
         return bufLen;
@@ -75,7 +75,7 @@
 protected:
     virtual void *allocBuffer(size_t minSize) = 0;
     virtual int commitBuffer(size_t size) = 0;
-    virtual const unsigned char *read(void *buf, size_t *inout_len) = 0;
+    virtual const unsigned char *readRaw(void *buf, size_t *inout_len) = 0;
 
 private:
     unsigned char* m_buf = nullptr;
diff --git a/distrib/android-emugl/host/include/OpenglRender/RenderChannel.h b/distrib/android-emugl/host/include/OpenglRender/RenderChannel.h
index a6ee17a..40232e2 100644
--- a/distrib/android-emugl/host/include/OpenglRender/RenderChannel.h
+++ b/distrib/android-emugl/host/include/OpenglRender/RenderChannel.h
@@ -21,12 +21,21 @@
 
 namespace emugl {
 
-// Turn the RenderChannel::Event enum into flags.
+// Turn the RenderChannel::State enum into flags.
 using namespace ::android::base::EnumFlags;
 
-// RenderChannel - an interface for a single guest to host renderer connection.
-// It allows the guest to send GPU emulation protocol-serialized messages to an
-// asynchronous renderer, read the responses and subscribe for state updates.
+// RenderChannel - For each guest-to-host renderer connection, this provides
+// an interface for the guest side to interact with the corresponding renderer
+// thread on the host. Its main purpose is to send and receive wire protocol
+// bytes in an asynchronous way (compatible with Android pipes).
+//
+// Usage is the following:
+//    1) Get an instance pointer through a dedicated Renderer function
+//       (e.g. RendererImpl::createRenderChannel()).
+//
+//    2) Call setEventCallback() to indicate which callback should be called
+//       when the channel's state has changed due to a host thread event.
+//
 class RenderChannel {
 public:
     // A type used to pass byte packets between the guest and the
@@ -37,7 +46,10 @@
     // the one used in protocol-heavy benchmark like Antutu3D.
     using Buffer = android::base::SmallFixedVector<char, 512>;
 
-    // Flags for the channel state.
+    // Bit-flags for the channel state.
+    // |CanRead| means there is data from the host to read.
+    // |CanWrite| means there is room to send data to the host.
+    // |Stopped| means the channel was stopped.
     enum class State {
         // Can't use None here, some system header declares it as a macro.
         Empty = 0,
@@ -57,47 +69,49 @@
         Error = 2,
     };
 
-    // Possible points of origin for an event in EventCallback.
-    enum class EventSource {
-        RenderChannel,
-        Client,
-    };
+    // Type of a callback used to tell the guest when the RenderChannel
+    // state changes. Used by setEventCallback(). The parameter contains
+    // the State bits matching the event, i.e. it is the logical AND of
+    // the last value passed to setWantedEvents() and the current
+    // RenderChannel state.
+    using EventCallback = std::function<void(State)>;
 
-    // Types of read() the channel supports.
-    enum class CallType {
-        Blocking,    // if the call can't do what it needs, block until it can
-        Nonblocking, // immidiately return if the call can't do the job
-    };
+    // Sets a single (!) callback that is called when the channel state's
+    // changes due to an event *from* *the* *host* only. |callback| is a
+    // guest-provided callback that will be called from the host renderer
+    // thread, not the guest one.
+    virtual void setEventCallback(EventCallback&& callback) = 0;
 
-    // Sets a single (!) callback that is called if some event happends that
-    // changes the channel state - e.g. when it's stopped, or it gets some data
-    // the client can read after being empty, or it isn't full anymore and the
-    // client may write again without blocking.
-    // If the state isn't State::Empty, the |callback| is called for the first
-    // time during the setEventCallback() to report this initial state.
-    using EventCallback = std::function<void(State, EventSource)>;
-    virtual void setEventCallback(EventCallback callback) = 0;
-
-    // Writes the data in |buffer| into the channel. |buffer| is moved from.
-    // Blocks if there's no room in the channel (shouldn't really happen).
-    // Returns false if the channel is stopped.
-    virtual bool write(Buffer&& buffer) = 0;
-    // Reads a chunk of data from the channel. Returns false if there was no
-    // data for a non-blocking call or if the channel is stopped.
-    virtual bool read(Buffer* buffer, CallType callType) = 0;
+    // Used to indicate which i/o events the guest wants to be notified
+    // through its StateChangeCallback. |state| must be a combination of
+    // State::CanRead or State::CanWrite only. This will *not* call the
+    // callback directly since this happens in the guest thread.
+    virtual void setWantedEvents(State state) = 0;
 
     // Get the current state flags.
-    virtual State currentState() const = 0;
+    virtual State state() const = 0;
+
+    // Try to writes the data in |buffer| into the channel. On success,
+    // return IoResult::Ok and moves |buffer|. On failure, return
+    // IoResult::TryAgain if the channel was full, or IoResult::Error
+    // if it is stopped.
+    virtual IoResult tryWrite(Buffer&& buffer) = 0;
+
+    // Try to read data from the channel. On success, return IoResult::Ok and
+    // sets |*buffer| to contain the data. On failure, return
+    // IoResult::TryAgain if the channel was empty, or IoResult::Error if
+    // it was stopped.
+    virtual IoResult tryRead(Buffer* buffer) = 0;
 
     // Abort all pending operations. Any following operation is a noop.
+    // Once a channel is stopped, it cannot be re-started.
     virtual void stop() = 0;
-    // Check if the channel is stopped.
-    virtual bool isStopped() const = 0;
 
 protected:
     ~RenderChannel() = default;
 };
 
+// Shared pointer to RenderChannel instance.
 using RenderChannelPtr = std::shared_ptr<RenderChannel>;
 
 }  // namespace emugl
diff --git a/distrib/android-emugl/host/libs/libOpenglRender/ChannelStream.cpp b/distrib/android-emugl/host/libs/libOpenglRender/ChannelStream.cpp
index ac2309b..0f402b9 100644
--- a/distrib/android-emugl/host/libs/libOpenglRender/ChannelStream.cpp
+++ b/distrib/android-emugl/host/libs/libOpenglRender/ChannelStream.cpp
@@ -13,46 +13,81 @@
 // limitations under the License.
 #include "ChannelStream.h"
 
+#include "OpenglRender/RenderChannel.h"
+
+#define EMUGL_DEBUG_LEVEL  0
+#include "emugl/common/debug.h"
+
 #include <assert.h>
+#include <memory.h>
 
 namespace emugl {
 
+using IoResult = RenderChannel::IoResult;
+
 ChannelStream::ChannelStream(std::shared_ptr<RenderChannelImpl> channel,
                              size_t bufSize)
     : IOStream(bufSize), mChannel(channel) {
-    mBuf.resize_noinit(bufSize);
+    mWriteBuffer.resize_noinit(bufSize);
 }
 
 void* ChannelStream::allocBuffer(size_t minSize) {
-    if (mBuf.size() < minSize) {
-        mBuf.resize_noinit(minSize);
+    if (mWriteBuffer.size() < minSize) {
+        mWriteBuffer.resize_noinit(minSize);
     }
-    return mBuf.data();
+    return mWriteBuffer.data();
 }
 
 int ChannelStream::commitBuffer(size_t size) {
-    assert(size <= mBuf.size());
-    if (mBuf.isAllocated()) {
-        mBuf.resize(size);
-        mChannel->writeToGuest(std::move(mBuf));
+    assert(size <= mWriteBuffer.size());
+    if (mWriteBuffer.isAllocated()) {
+        mWriteBuffer.resize(size);
+        mChannel->writeToGuest(std::move(mWriteBuffer));
     } else {
         mChannel->writeToGuest(
-                RenderChannel::Buffer(mBuf.data(), mBuf.data() + size));
+                RenderChannel::Buffer(mWriteBuffer.data(), mWriteBuffer.data() + size));
     }
     return size;
 }
 
-const unsigned char* ChannelStream::read(void* buf, size_t* inout_len) {
-    size_t size = mChannel->readFromGuest((char*)buf, *inout_len, true);
-    if (size == 0) {
+const unsigned char* ChannelStream::readRaw(void* buf, size_t* inout_len) {
+    size_t wanted = *inout_len;
+    size_t count = 0U;
+    auto dst = static_cast<uint8_t*>(buf);
+    D("wanted %d bytes", (int)wanted);
+    while (count < wanted) {
+        if (mReadBufferLeft > 0) {
+            size_t avail = std::min<size_t>(wanted - count, mReadBufferLeft);
+            memcpy(dst + count,
+                   mReadBuffer.data() + (mReadBuffer.size() - mReadBufferLeft),
+                   avail);
+            count += avail;
+            mReadBufferLeft -= avail;
+            continue;
+        }
+        bool blocking = (count == 0);
+        auto result = mChannel->readFromGuest(&mReadBuffer, blocking);
+        D("readFromGuest() returned %d, size %d", (int)result, (int)mReadBuffer.size());
+        if (result == IoResult::Ok) {
+            mReadBufferLeft = mReadBuffer.size();
+            continue;
+        }
+        if (count > 0) {  // There is some data to return.
+            break;
+        }
+        // Result can only be IoResult::Error if |count == 0| since |blocking|
+        // was true, it cannot be IoResult::TryAgain.
+        assert(result == IoResult::Error);
+        D("error while trying to read");
         return nullptr;
     }
-    *inout_len = size;
+    *inout_len = count;
+    D("read %d bytes", (int)count);
     return (const unsigned char*)buf;
 }
 
 void ChannelStream::forceStop() {
-    mChannel->stop();
+    mChannel->stopFromHost();
 }
 
 }  // namespace emugl
diff --git a/distrib/android-emugl/host/libs/libOpenglRender/ChannelStream.h b/distrib/android-emugl/host/libs/libOpenglRender/ChannelStream.h
index c9c34f6..02b2fe7 100644
--- a/distrib/android-emugl/host/libs/libOpenglRender/ChannelStream.h
+++ b/distrib/android-emugl/host/libs/libOpenglRender/ChannelStream.h
@@ -17,25 +17,28 @@
 #include "RenderChannelImpl.h"
 
 #include <memory>
-#include <vector>
 
 namespace emugl {
 
+// An IOStream instance that can be used by the host RenderThread to
+// wrap a RenderChannelImpl channel.
 class ChannelStream final : public IOStream {
 public:
-    ChannelStream(std::shared_ptr<RenderChannelImpl> channel,
-                  size_t bufSize);
-
-    virtual void* allocBuffer(size_t minSize) override final;
-    virtual int commitBuffer(size_t size) override final;
-    virtual const unsigned char* read(void* buf,
-                                      size_t* inout_len) override final;
+    ChannelStream(std::shared_ptr<RenderChannelImpl> channel, size_t bufSize);
 
     void forceStop();
 
+protected:
+    virtual void* allocBuffer(size_t minSize) override final;
+    virtual int commitBuffer(size_t size) override final;
+    virtual const unsigned char* readRaw(void* buf, size_t* inout_len)
+            override final;
+
 private:
     std::shared_ptr<RenderChannelImpl> mChannel;
-    RenderChannel::Buffer mBuf;
+    RenderChannel::Buffer mWriteBuffer;
+    RenderChannel::Buffer mReadBuffer;
+    size_t mReadBufferLeft = 0;
 };
 
 }  // namespace emugl
diff --git a/distrib/android-emugl/host/libs/libOpenglRender/RenderChannelImpl.cpp b/distrib/android-emugl/host/libs/libOpenglRender/RenderChannelImpl.cpp
index c01afc0..db27b41 100644
--- a/distrib/android-emugl/host/libs/libOpenglRender/RenderChannelImpl.cpp
+++ b/distrib/android-emugl/host/libs/libOpenglRender/RenderChannelImpl.cpp
@@ -13,6 +13,8 @@
 // limitations under the License.
 #include "RenderChannelImpl.h"
 
+#include "android/base/synchronization/Lock.h"
+
 #include <algorithm>
 #include <utility>
 
@@ -21,147 +23,130 @@
 
 namespace emugl {
 
-RenderChannelImpl::RenderChannelImpl(std::shared_ptr<RendererImpl> renderer)
-    : mRenderer(std::move(renderer)), mState(State::Empty) {
-    assert(mState.is_lock_free()); // rethink it if this fails - we've already
-                                   // got a lock, use it instead.
+#define EMUGL_DEBUG_LEVEL 0
+#include "emugl/common/debug.h"
+
+using Buffer = RenderChannel::Buffer;
+using IoResult = RenderChannel::IoResult;
+using State = RenderChannel::State;
+using AutoLock = android::base::AutoLock;
+
+// These constants correspond to the capacities of buffer queues
+// used by each RenderChannelImpl instance. Benchmarking shows that
+// it's important to have a large queue for guest -> host transfers,
+// but a much smaller one works for host -> guest ones.
+static constexpr size_t kGuestToHostQueueCapacity = 1024U;
+static constexpr size_t kHostToGuestQueueCapacity = 16U;
+
+RenderChannelImpl::RenderChannelImpl()
+    : mLock(),
+      mFromGuest(kGuestToHostQueueCapacity, mLock),
+      mToGuest(kHostToGuestQueueCapacity, mLock) {
+    updateStateLocked();
 }
 
-void RenderChannelImpl::setEventCallback(
-        RenderChannelImpl::EventCallback callback) {
-    mOnEvent = std::move(callback);
-
-    // Reset the current state to make sure the new subscriber gets it.
-    mState.store(State::Empty, std::memory_order_release);
-    onEvent(true);
+void RenderChannelImpl::setEventCallback(EventCallback&& callback) {
+    mEventCallback = std::move(callback);
 }
 
-bool RenderChannelImpl::write(Buffer&& buffer) {
-    const bool res = mFromGuest.send(std::move(buffer));
-    onEvent(true);
-    return res;
+void RenderChannelImpl::setWantedEvents(State state) {
+    D("state=%d", (int)state);
+    AutoLock lock(mLock);
+    mWantedEvents |= state;
+    notifyStateChangeLocked();
 }
 
-bool RenderChannelImpl::read(Buffer* buffer, CallType type) {
-    if (type == CallType::Nonblocking && mToGuest.size() == 0) {
-        return false;
-    }
-    const bool res = mToGuest.receive(buffer);
-    onEvent(true);
-    return res;
+RenderChannel::State RenderChannelImpl::state() const {
+    AutoLock lock(mLock);
+    return mState;
+}
+
+IoResult RenderChannelImpl::tryWrite(Buffer&& buffer) {
+    D("buffer size=%d", (int)buffer.size());
+    AutoLock lock(mLock);
+    auto result = mFromGuest.tryPushLocked(std::move(buffer));
+    updateStateLocked();
+    DD("mFromGuest.tryPushLocked() returned %d, state %d", (int)result,
+       (int)mState);
+    return result;
+}
+
+IoResult RenderChannelImpl::tryRead(Buffer* buffer) {
+    D("enter");
+    AutoLock lock(mLock);
+    auto result = mToGuest.tryPopLocked(buffer);
+    updateStateLocked();
+    DD("mToGuest.tryPopLocked() returned %d, buffer size %d, state %d",
+       (int)result, (int)buffer->size(), (int)mState);
+    return result;
 }
 
 void RenderChannelImpl::stop() {
-    stop(true);
+    D("enter");
+    AutoLock lock(mLock);
+    mFromGuest.closeLocked();
+    mToGuest.closeLocked();
 }
 
-void RenderChannelImpl::forceStop() {
-    stop(false);
+bool RenderChannelImpl::writeToGuest(Buffer&& buffer) {
+    D("buffer size=%d", (int)buffer.size());
+    AutoLock lock(mLock);
+    IoResult result = mToGuest.pushLocked(std::move(buffer));
+    updateStateLocked();
+    DD("mToGuest.pushLocked() returned %d, state %d", (int)result, (int)mState);
+    notifyStateChangeLocked();
+    return result == IoResult::Ok;
 }
 
-void RenderChannelImpl::stop(bool byGuest) {
-    if (mStopped) {
-        return;
-    }
-    mStopped = true;
-    mFromGuest.stop();
-    mToGuest.stop();
-    onEvent(byGuest);
-}
-
-bool RenderChannelImpl::isStopped() const {
-    return mStopped;
-}
-
-void RenderChannelImpl::writeToGuest(Buffer&& buf) {
-    mToGuest.send(std::move(buf));
-    onEvent(false);
-}
-
-size_t RenderChannelImpl::readFromGuest(Buffer::value_type* buf,
-                                        size_t size,
-                                        bool blocking) {
-    assert(buf);
-
-    size_t read = 0;
-    const auto bufEnd = buf + size;
-    while (buf != bufEnd && !mStopped) {
-        if (mFromGuestBufferLeft == 0) {
-            if (mFromGuest.size() == 0 && (read > 0 || !blocking)) {
-                break;
-            }
-            if (!mFromGuest.receive(&mFromGuestBuffer)) {
-                break;
-            }
-            mFromGuestBufferLeft = mFromGuestBuffer.size();
-        }
-
-        const size_t curSize =
-                std::min<size_t>(bufEnd - buf, mFromGuestBufferLeft);
-        memcpy(buf, mFromGuestBuffer.data() +
-                            (mFromGuestBuffer.size() - mFromGuestBufferLeft),
-               curSize);
-
-        read += curSize;
-        buf += curSize;
-        mFromGuestBufferLeft -= curSize;
-    }
-    onEvent(false);
-    return read;
-}
-
-void RenderChannelImpl::onEvent(bool byGuest) {
-    if (!mOnEvent) {
-        return;
-    }
-
-    // We need to make sure that the state returned from calcState() remains
-    // valid when we write it into mState; this means we need to lock both
-    // calcState() and mState assignment with the same lock - otherwise it is
-    // possible (and it happened before the lock was added) that some other
-    // thread would overwrite a newer state with its older value, e.g.:
-    //  - thread 1 reads the last message from |mToGuest|
-    //  - thread 1 enters onEvent()
-    //  - thread 1 calculates state 1 = "can't read", switches out
-    //  -   thread 2 writes some data into |mToGuest|
-    //  -   thread 2 enters onEvent()
-    //  -   thread 2 calculates state 2 = "can read"
-    //  -   thread 2 gets the lock
-    //  -   thread 2 updates |mState| to "can read", unlocks, switches out
-    //  - thread 1 gets the lock
-    //  - thread 1 overwrites |mState| with state 1 = "can't read"
-    //  - thread 1 unlocks and calls the |mOnEvent| callback.
-    //
-    // The result is that state 2 = "can read" is completely lost - callback
-    // is never called when |mState| is "can read".
-    //
-    // But if the whole block of code is locked, threads can't overwrite newer
-    // |mState| with some older value, and the described situation would never
-    // happen.
-    android::base::AutoLock lock(mStateLock);
-    const State newState = calcState();
-    if (mState != newState) {
-        mState = newState;
-        lock.unlock();
-
-        mOnEvent(newState,
-                 byGuest ? EventSource::Client : EventSource::RenderChannel);
-    }
-}
-
-RenderChannel::State RenderChannelImpl::calcState() const {
-    State state = State::Empty;
-    if (mStopped) {
-        state = State::Stopped;
+IoResult RenderChannelImpl::readFromGuest(Buffer* buffer, bool blocking) {
+    D("enter");
+    AutoLock lock(mLock);
+    IoResult result;
+    if (blocking) {
+        result = mFromGuest.popLocked(buffer);
     } else {
-        if (mFromGuest.size() < mFromGuest.capacity()) {
-            state |= State::CanWrite;
-        }
-        if (mToGuest.size() > 0) {
-            state |= State::CanRead;
-        }
+        result = mFromGuest.tryPopLocked(buffer);
     }
-    return state;
+    updateStateLocked();
+    DD("mFromGuest.%s() return %d, buffer size %d, state %d",
+       blocking ? "popLocked" : "tryPopLocked", (int)result,
+       (int)buffer->size(), (int)mState);
+    notifyStateChangeLocked();
+    return result;
+}
+
+void RenderChannelImpl::stopFromHost() {
+    D("enter");
+
+    AutoLock lock(mLock);
+    mFromGuest.closeLocked();
+    mToGuest.closeLocked();
+    mState |= State::Stopped;
+    notifyStateChangeLocked();
+}
+
+void RenderChannelImpl::updateStateLocked() {
+    State state = RenderChannel::State::Empty;
+
+    if (mToGuest.canPopLocked()) {
+        state |= State::CanRead;
+    }
+    if (mFromGuest.canPushLocked()) {
+        state |= State::CanWrite;
+    }
+    if (mToGuest.isClosedLocked()) {
+        state |= State::Stopped;
+    }
+    mState = state;
+}
+
+void RenderChannelImpl::notifyStateChangeLocked() {
+    State available = mState & mWantedEvents;
+    if (available != 0) {
+        D("callback with %d", (int)available);
+        mWantedEvents &= ~mState;
+        mEventCallback(available);
+    }
 }
 
 }  // namespace emugl
diff --git a/distrib/android-emugl/host/libs/libOpenglRender/RenderChannelImpl.h b/distrib/android-emugl/host/libs/libOpenglRender/RenderChannelImpl.h
index 5252590..0d306d1 100644
--- a/distrib/android-emugl/host/libs/libOpenglRender/RenderChannelImpl.h
+++ b/distrib/android-emugl/host/libs/libOpenglRender/RenderChannelImpl.h
@@ -14,80 +14,76 @@
 #pragma once
 
 #include "OpenglRender/RenderChannel.h"
+#include "BufferQueue.h"
 #include "RendererImpl.h"
 
-#include "android/base/Compiler.h"
-#include "android/base/synchronization/Lock.h"
-#include "android/base/synchronization/MessageChannel.h"
-
-#include <atomic>
-#include <memory>
-
 namespace emugl {
 
+// Implementation of the RenderChannel interface that connects a guest
+// client thread (really an AndroidPipe implementation) to a host
+// RenderThread instance.
 class RenderChannelImpl final : public RenderChannel {
 public:
-    RenderChannelImpl(std::shared_ptr<RendererImpl> renderer);
+    // Default constructor.
+    RenderChannelImpl();
 
-public:
-    // RenderChannel implementation, operations provided for a guest system
-    virtual void setEventCallback(EventCallback callback) override final;
+    /////////////////////////////////////////////////////////////////
+    // RenderChannel overriden methods. These are called from the guest
+    // client thread.
 
-    virtual bool write(Buffer&& buffer) override final;
-    virtual bool read(Buffer* buffer, CallType type) override final;
+    // Set the event |callback| to be notified when the host changes the
+    // state of the channel, according to the event mask provided by
+    // setWantedEvents(). Call this function right after creating the
+    // instance.
+    virtual void setEventCallback(EventCallback&& callback) override final;
 
-    virtual State currentState() const override final {
-        return mState.load(std::memory_order_acquire);
-    }
+    // Set the mask of events the guest wants to be notified of from the
+    // host thread.
+    virtual void setWantedEvents(State state) override final;
 
+    // Return the current channel state relative to the guest.
+    virtual State state() const override final;
+
+    // Try to send a buffer from the guest to the host render thread.
+    virtual IoResult tryWrite(Buffer&& buffer) override final;
+
+    // Try to read a buffer from the host render thread into the guest.
+    virtual IoResult tryRead(Buffer* buffer) override final;
+
+    // Close the channel from the guest.
     virtual void stop() override final;
-    virtual bool isStopped() const override final;
 
-public:
-    // These functions are for the RenderThread, they could be called in
-    // parallel with the ones from the RenderChannel interface. Make sure the
-    // internal state remains consistent all the time.
-    void writeToGuest(Buffer&& buf);
-    size_t readFromGuest(Buffer::value_type* buf, size_t size,
-                         bool blocking);
-    void forceStop();
+    /////////////////////////////////////////////////////////////////
+    // These functions are called from the host render thread.
+
+    // Send a buffer to the guest, this call is blocking. On success,
+    // move |buffer| into the channel and return true. On failure, return
+    // false (meaning that the channel was closed).
+    bool writeToGuest(Buffer&& buffer);
+
+    // Read data from the guest. If |blocking| is true, the call will be
+    // blocking. On success, move item into |*buffer| and return true. On
+    // failure, return IoResult::Error to indicate the channel was closed,
+    // or IoResult::TryAgain to indicate it was empty (this can happen only
+    // if |blocking| is false).
+    IoResult readFromGuest(Buffer* buffer, bool blocking);
+
+    // Close the channel from the host.
+    void stopFromHost();
 
 private:
-    DISALLOW_COPY_ASSIGN_AND_MOVE(RenderChannelImpl);
+    void updateStateLocked();
+    void notifyStateChangeLocked();
 
-private:
-    void onEvent(bool byGuest);
-    State calcState() const;
-    void stop(bool byGuest);
+    EventCallback mEventCallback;
 
-private:
-    std::shared_ptr<RendererImpl> mRenderer;
-
-    EventCallback mOnEvent;
-
-    // Protects the state recalculation and writes to mState.
-    //
-    // The correctness condition governing the relationship between mFromGuest,
-    // mToGuest, and mState is that we can't reach a potentially stable state
-    // (i.e., at the end of a set of invocations of publically-visible
-    // operations) in which either:
-    //  - mFromGuest().size() < mFromGuest.capacity(), yet state does not have
-    //    State::CanWrite, or
-    //  - mToGuest().size > 0, yet state does not have State::CanRead.
-    // Clients assume they can poll currentState() and have the indicate whether
-    // a write or read might possibly succeed, and this correctness condition
-    // makes that assumption valid -- if a write or read might succeed,
-    // mState is required to eventually indicate this.
-    android::base::Lock mStateLock;
-    std::atomic<State> mState;
-
-    bool mStopped = false;
-
-    android::base::MessageChannel<Buffer, 1024> mFromGuest;
-    android::base::MessageChannel<Buffer, 16> mToGuest;
-
-    Buffer mFromGuestBuffer;
-    size_t mFromGuestBufferLeft = 0;
+    // A single lock to protect the state and the two buffer queues at the
+    // same time. NOTE: This needs to appear before the BufferQueue instances.
+    mutable android::base::Lock mLock;
+    State mState = State::Empty;
+    State mWantedEvents = State::Empty;
+    BufferQueue mFromGuest;
+    BufferQueue mToGuest;
 };
 
 }  // namespace emugl
diff --git a/distrib/android-emugl/host/libs/libOpenglRender/RenderThread.cpp b/distrib/android-emugl/host/libs/libOpenglRender/RenderThread.cpp
index 5ca41da..a34c985 100644
--- a/distrib/android-emugl/host/libs/libOpenglRender/RenderThread.cpp
+++ b/distrib/android-emugl/host/libs/libOpenglRender/RenderThread.cpp
@@ -58,16 +58,16 @@
 }
 
 intptr_t RenderThread::main() {
+    ChannelStream stream(mChannel, RenderChannel::Buffer::kSmallSize);
+
     uint32_t flags = 0;
-    if (mChannel->readFromGuest(reinterpret_cast<char*>(&flags),
-                                sizeof(flags), true) != sizeof(flags)) {
+    if (stream.read(&flags, sizeof(flags)) != sizeof(flags)) {
         return 0;
     }
 
     // |flags| used to have something, now they're not used.
     (void)flags;
 
-    ChannelStream stream(mChannel, RenderChannel::Buffer::kSmallSize);
     RenderThreadInfo tInfo;
     ChecksumCalculatorThreadInfo tChecksumInfo;
     ChecksumCalculator& checksumCalc = tChecksumInfo.get();
diff --git a/distrib/android-emugl/host/libs/libOpenglRender/RendererImpl.cpp b/distrib/android-emugl/host/libs/libOpenglRender/RendererImpl.cpp
index ff14aac..68d646f 100644
--- a/distrib/android-emugl/host/libs/libOpenglRender/RendererImpl.cpp
+++ b/distrib/android-emugl/host/libs/libOpenglRender/RendererImpl.cpp
@@ -92,7 +92,7 @@
 
     for (const auto& t : threads) {
         if (const auto channel = t.second.lock()) {
-            channel->forceStop();
+            channel->stopFromHost();
         }
     }
     // We're stopping the renderer, so there's no need to clean up resources
@@ -106,8 +106,7 @@
 }
 
 RenderChannelPtr RendererImpl::createRenderChannel() {
-    const auto channel =
-            std::make_shared<RenderChannelImpl>(shared_from_this());
+    const auto channel = std::make_shared<RenderChannelImpl>();
 
     std::unique_ptr<RenderThread> rt(RenderThread::create(
             shared_from_this(), channel));