/*
 * Copyright (C) 2012 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/*
 * Encapsulates exchange protocol between the emulator, and an Android device
 * that is connected to the host via USB. The communication is established over
 * a TCP port forwarding, enabled by ADB.
 */

#include "android/async-socket.h"

#include "android/async-socket-connector.h"
#include "android/utils/debug.h"
#include "android/utils/eintr_wrapper.h"
#include "android/utils/iolooper.h"
#include "android/utils/panic.h"

#include <assert.h>

#define  E(...)    derror(__VA_ARGS__)
#define  W(...)    dwarning(__VA_ARGS__)
#define  D(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
#define  D_ACTIVE  VERBOSE_CHECK(asyncsocket)

#define TRACE_ON    0

#if TRACE_ON
#define  T(...)    VERBOSE_PRINT(asyncsocket,__VA_ARGS__)
#else
#define  T(...)
#endif

/********************************************************************************
 *                  Asynchronous Socket internal API declarations
 *******************************************************************************/

/* Gets socket's address string. */
static const char* _async_socket_string(AsyncSocket* as);

/* Gets socket's looper. */
static Looper* _async_socket_get_looper(AsyncSocket* as);

/* Handler for the I/O time out.
 * Param:
 *  as - Asynchronous socket for the I/O.
 *  asio - Desciptor for the timed out I/O.
 */
static AsyncIOAction _async_socket_io_timed_out(AsyncSocket* as,
                                                AsyncSocketIO* asio);

/********************************************************************************
 *                  Asynchronous Socket Reader / Writer
 *******************************************************************************/

struct AsyncSocketIO {
    /* Next I/O in the reader, or writer list. */
    AsyncSocketIO*      next;
    /* Asynchronous socket for this I/O. */
    AsyncSocket*        as;
    /* Timer used for time outs on this I/O. */
    LoopTimer*          timer;
    /* An opaque pointer associated with this I/O. */
    void*               io_opaque;
    /* Buffer where to read / write data. */
    uint8_t*            buffer;
    /* Bytes to transfer through the socket for this I/O. */
    uint32_t            to_transfer;
    /* Bytes thransferred through the socket in this I/O. */
    uint32_t            transferred;
    /* I/O callback for this I/O. */
    on_as_io_cb         on_io;
    /* I/O type selector: 1 - read, 0 - write. */
    int                 is_io_read;
    /* State of the I/O. */
    AsyncIOState        state;
    /* Number of outstanding references to the I/O. */
    int                 ref_count;
    /* Deadline for this I/O */
    Duration            deadline;
};

/*
 * Recycling I/O instances.
 * Since AsyncSocketIO instances are not that large, it makes sence to recycle
 * them for faster allocation, rather than allocating and freeing them for each
 * I/O on the socket.
 */

/* List of recycled I/O descriptors. */
static AsyncSocketIO* _asio_recycled    = NULL;
/* Number of I/O descriptors that are recycled in the _asio_recycled list. */
static int _recycled_asio_count         = 0;
/* Maximum number of I/O descriptors that can be recycled. */
static const int _max_recycled_asio_num = 32;

/* Handler for an I/O time-out timer event.
 * When this routine is invoked, it indicates that a time out has occurred on an
 * I/O.
 * Param:
 *  opaque - AsyncSocketIO instance representing the timed out I/O.
 */
static void _on_async_socket_io_timed_out(void* opaque, LoopTimer* timer);

/* Creates new I/O descriptor.
 * Param:
 *  as - Asynchronous socket for the I/O.
 *  is_io_read - I/O type selector: 1 - read, 0 - write.
 *  buffer, len - Reader / writer buffer address.
 *  io_cb - Callback for this reader / writer.
 *  io_opaque - An opaque pointer associated with the I/O.
 *  deadline - Deadline to complete the I/O.
 * Return:
 *  Initialized AsyncSocketIO instance.
 */
static AsyncSocketIO*
_async_socket_rw_new(AsyncSocket* as,
                     int is_io_read,
                     void* buffer,
                     uint32_t len,
                     on_as_io_cb io_cb,
                     void* io_opaque,
                     Duration deadline)
{
    /* Lookup in the recycler first. */
    AsyncSocketIO* asio = _asio_recycled;
    if (asio != NULL) {
        /* Pull the descriptor from recycler. */
        _asio_recycled = asio->next;
        _recycled_asio_count--;
    } else {
        /* No recycled descriptors. Allocate new one. */
        ANEW0(asio);
    }

    asio->next          = NULL;
    asio->as            = as;
    asio->is_io_read    = is_io_read;
    asio->buffer        = (uint8_t*)buffer;
    asio->to_transfer   = len;
    asio->transferred   = 0;
    asio->on_io         = io_cb;
    asio->io_opaque     = io_opaque;
    asio->state         = ASIO_STATE_QUEUED;
    asio->ref_count     = 1;
    asio->deadline      = deadline;
    asio->timer = loopTimer_new(
            _async_socket_get_looper(as),
            _on_async_socket_io_timed_out,
            asio);
    loopTimer_startAbsolute(asio->timer, deadline);

    /* Reference socket that is holding this I/O. */
    async_socket_reference(as);

    T("ASocket %s: %s I/O descriptor %p is created for %d bytes of data",
      _async_socket_string(as), is_io_read ? "READ" : "WRITE", asio, len);

    return asio;
}

/* Destroys and frees I/O descriptor. */
static void
_async_socket_io_free(AsyncSocketIO* asio)
{
    AsyncSocket* const as = asio->as;

    T("ASocket %s: %s I/O descriptor %p is destroyed.",
      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);

    loopTimer_free(asio->timer);

    /* Try to recycle it first, and free the memory if recycler is full. */
    if (_recycled_asio_count < _max_recycled_asio_num) {
        asio->next = _asio_recycled;
        _asio_recycled = asio;
        _recycled_asio_count++;
    } else {
        AFREE(asio);
    }

    /* Release socket that is holding this I/O. */
    async_socket_release(as);
}

/* An I/O has been finished and its descriptor is about to be discarded. */
static void
_async_socket_io_finished(AsyncSocketIO* asio)
{
    /* Notify the client of the I/O that I/O is finished. */
    asio->on_io(asio->io_opaque, asio, ASIO_STATE_FINISHED);
}

int
async_socket_io_reference(AsyncSocketIO* asio)
{
    assert(asio->ref_count > 0);
    asio->ref_count++;
    return asio->ref_count;
}

int
async_socket_io_release(AsyncSocketIO* asio)
{
    assert(asio->ref_count > 0);
    asio->ref_count--;
    if (asio->ref_count == 0) {
        _async_socket_io_finished(asio);
        /* Last reference has been dropped. Destroy this object. */
        _async_socket_io_free(asio);
        return 0;
    }
    return asio->ref_count;
}

/* Creates new asynchronous socket reader.
 * Param:
 *  as - Asynchronous socket for the reader.
 *  buffer, len - Reader's buffer.
 *  io_cb - Reader's callback.
 *  reader_opaque - An opaque pointer associated with the reader.
 *  deadline - Deadline to complete the operation.
 * Return:
 *  An initialized AsyncSocketIO intance.
 */
static AsyncSocketIO*
_async_socket_reader_new(AsyncSocket* as,
                         void* buffer,
                         uint32_t len,
                         on_as_io_cb io_cb,
                         void* reader_opaque,
                         Duration deadline)
{
    AsyncSocketIO* const asio = _async_socket_rw_new(as, 1, buffer, len, io_cb,
                                                     reader_opaque, deadline);
    return asio;
}

/* Creates new asynchronous socket writer.
 * Param:
 *  as - Asynchronous socket for the writer.
 *  buffer, len - Writer's buffer.
 *  io_cb - Writer's callback.
 *  writer_opaque - An opaque pointer associated with the writer.
 *  deadline - Deadline to complete the operation.
 * Return:
 *  An initialized AsyncSocketIO intance.
 */
static AsyncSocketIO*
_async_socket_writer_new(AsyncSocket* as,
                         const void* buffer,
                         uint32_t len,
                         on_as_io_cb io_cb,
                         void* writer_opaque,
                         Duration deadline)
{
    AsyncSocketIO* const asio = _async_socket_rw_new(as, 0, (void*)buffer, len,
                                                     io_cb, writer_opaque,
                                                     deadline);
    return asio;
}

/* I/O timed out. */
static void
_on_async_socket_io_timed_out(void* opaque, LoopTimer* timer)
{
    AsyncSocketIO* const asio = (AsyncSocketIO*)opaque;
    AsyncSocket* const as = asio->as;

    D("ASocket %s: %s I/O with deadline %lld has timed out at %lld",
      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
      asio->deadline, async_socket_deadline(as, 0));

    /* Reference while in callback. */
    async_socket_io_reference(asio);
    _async_socket_io_timed_out(asio->as, asio);
    async_socket_io_release(asio);
}

/********************************************************************************
 *                 Public Asynchronous Socket I/O API
 *******************************************************************************/

AsyncSocket*
async_socket_io_get_socket(const AsyncSocketIO* asio)
{
    async_socket_reference(asio->as);
    return asio->as;
}

void
async_socket_io_cancel_time_out(AsyncSocketIO* asio)
{
    loopTimer_stop(asio->timer);
}

void*
async_socket_io_get_io_opaque(const AsyncSocketIO* asio)
{
    return asio->io_opaque;
}

void*
async_socket_io_get_client_opaque(const AsyncSocketIO* asio)
{
    return async_socket_get_client_opaque(asio->as);
}

void*
async_socket_io_get_buffer_info(const AsyncSocketIO* asio,
                                uint32_t* transferred,
                                uint32_t* to_transfer)
{
    if (transferred != NULL) {
        *transferred = asio->transferred;
    }
    if (to_transfer != NULL) {
        *to_transfer = asio->to_transfer;
    }
    return asio->buffer;
}

void*
async_socket_io_get_buffer(const AsyncSocketIO* asio)
{
    return asio->buffer;
}

uint32_t
async_socket_io_get_transferred(const AsyncSocketIO* asio)
{
    return asio->transferred;
}

uint32_t
async_socket_io_get_to_transfer(const AsyncSocketIO* asio)
{
    return asio->to_transfer;
}

int
async_socket_io_is_read(const AsyncSocketIO* asio)
{
    return asio->is_io_read;
}

/********************************************************************************
 *                      Asynchronous Socket internals
 *******************************************************************************/

struct AsyncSocket {
    /* TCP address for the socket. */
    SockAddress         address;
    /* Connection callback for this socket. */
    on_as_connection_cb on_connection;
    /* An opaque pointer associated with this socket by the client. */
    void*               client_opaque;
    /* I/O looper for asynchronous I/O on the socket. */
    Looper*             looper;
    /* I/O descriptor for asynchronous I/O on the socket. */
    LoopIo*             io;
    /* Timer to use for reconnection attempts. */
    LoopTimer*          reconnect_timer;
    /* Head of the list of the active readers. */
    AsyncSocketIO*      readers_head;
    /* Tail of the list of the active readers. */
    AsyncSocketIO*      readers_tail;
    /* Head of the list of the active writers. */
    AsyncSocketIO*      writers_head;
    /* Tail of the list of the active writers. */
    AsyncSocketIO*      writers_tail;
    /* Socket's file descriptor. */
    int                 fd;
    /* Timeout to use for reconnection attempts. */
    int                 reconnect_to;
    /* Number of outstanding references to the socket. */
    int                 ref_count;
    /* Flags whether (1) or not (0) socket owns the looper. */
    int                 owns_looper;
};

static const char*
_async_socket_string(AsyncSocket* as)
{
    return sock_address_to_string(&as->address);
}

static Looper*
_async_socket_get_looper(AsyncSocket* as)
{
    return as->looper;
}

/* Pulls first reader out of the list.
 * Param:
 *  as - Initialized AsyncSocket instance.
 * Return:
 *  First I/O pulled out of the list, or NULL if there are no I/O in the list.
 *  Note that the caller is responsible for releasing the I/O object returned
 *  from this routine.
 */
static AsyncSocketIO*
_async_socket_pull_first_io(AsyncSocket* as,
                            AsyncSocketIO** list_head,
                            AsyncSocketIO** list_tail)
{
    AsyncSocketIO* const ret = *list_head;
    if (ret != NULL) {
        *list_head = ret->next;
        ret->next = NULL;
        if (*list_head == NULL) {
            *list_tail = NULL;
        }
    }
    return ret;
}

/* Pulls first reader out of the list.
 * Param:
 *  as - Initialized AsyncSocket instance.
 * Return:
 *  First reader pulled out of the list, or NULL if there are no readers in the
 *  list.
 *  Note that the caller is responsible for releasing the I/O object returned
 *  from this routine.
 */
static AsyncSocketIO*
_async_socket_pull_first_reader(AsyncSocket* as)
{
    return _async_socket_pull_first_io(as, &as->readers_head, &as->readers_tail);
}

/* Pulls first writer out of the list.
 * Param:
 *  as - Initialized AsyncSocket instance.
 * Return:
 *  First writer pulled out of the list, or NULL if there are no writers in the
 *  list.
 *  Note that the caller is responsible for releasing the I/O object returned
 *  from this routine.
 */
static AsyncSocketIO*
_async_socket_pull_first_writer(AsyncSocket* as)
{
    return _async_socket_pull_first_io(as, &as->writers_head, &as->writers_tail);
}

/* Removes an I/O descriptor from a list of active I/O.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  list_head, list_tail - Pointers to the list head and tail.
 *  io - I/O to remove.
 * Return:
 *  Boolean: 1 if I/O has been removed, or 0 if I/O has not been found in the list.
 */
static int
_async_socket_remove_io(AsyncSocket* as,
                        AsyncSocketIO** list_head,
                        AsyncSocketIO** list_tail,
                        AsyncSocketIO* io)
{
    AsyncSocketIO* prev = NULL;

    while (*list_head != NULL && io != *list_head) {
        prev = *list_head;
        list_head = &((*list_head)->next);
    }
    if (*list_head == NULL) {
        D("%s: I/O %p is not found in the list for socket '%s'",
          __FUNCTION__, io, _async_socket_string(as));
        return 0;
    }

    *list_head = io->next;
    if (prev != NULL) {
        prev->next = io->next;
    }
    if (*list_tail == io) {
        *list_tail = prev;
    }

    /* Release I/O adjusting reference added when I/O has been saved in the list. */
    async_socket_io_release(io);

    return 1;
}

/* Advances to the next I/O in the list.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  list_head, list_tail - Pointers to the list head and tail.
 */
static void
_async_socket_advance_io(AsyncSocket* as,
                         AsyncSocketIO** list_head,
                         AsyncSocketIO** list_tail)
{
    AsyncSocketIO* first_io = *list_head;
    if (first_io != NULL) {
        *list_head = first_io->next;
        first_io->next = NULL;
    }
    if (*list_head == NULL) {
        *list_tail = NULL;
    }
    if (first_io != NULL) {
        /* Release I/O removed from the head of the list. */
        async_socket_io_release(first_io);
    }
}

/* Advances to the next reader in the list.
 * Param:
 *  as - Initialized AsyncSocket instance.
 */
static void
_async_socket_advance_reader(AsyncSocket* as)
{
    _async_socket_advance_io(as, &as->readers_head, &as->readers_tail);
}

/* Advances to the next writer in the list.
 * Param:
 *  as - Initialized AsyncSocket instance.
 */
static void
_async_socket_advance_writer(AsyncSocket* as)
{
    _async_socket_advance_io(as, &as->writers_head, &as->writers_tail);
}

/* Completes an I/O.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  asio - I/O to complete.
 * Return:
 *  One of AsyncIOAction values.
 */
static AsyncIOAction
_async_socket_complete_io(AsyncSocket* as, AsyncSocketIO* asio)
{
    T("ASocket %s: %s I/O %p is completed.",
      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);

    /* Stop the timer. */
    async_socket_io_cancel_time_out(asio);

    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_SUCCEEDED);
}

/* Timeouts an I/O.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  asio - An I/O that has timed out.
 * Return:
 *  One of AsyncIOAction values.
 */
static AsyncIOAction
_async_socket_io_timed_out(AsyncSocket* as, AsyncSocketIO* asio)
{
    T("ASocket %s: %s I/O %p with deadline %lld has timed out at %lld",
      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
      asio->deadline, async_socket_deadline(as, 0));

    /* Report to the client. */
    const AsyncIOAction action = asio->on_io(asio->io_opaque, asio,
                                             ASIO_STATE_TIMED_OUT);

    /* Remove the I/O from a list of active I/O for actions other than retry. */
    if (action != ASIO_ACTION_RETRY) {
        if (asio->is_io_read) {
            _async_socket_remove_io(as, &as->readers_head, &as->readers_tail, asio);
        } else {
            _async_socket_remove_io(as, &as->writers_head, &as->writers_tail, asio);
        }
    }

    return action;
}

/* Cancels an I/O.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  asio - An I/O to cancel.
 * Return:
 *  One of AsyncIOAction values.
 */
static AsyncIOAction
_async_socket_cancel_io(AsyncSocket* as, AsyncSocketIO* asio)
{
    T("ASocket %s: %s I/O %p is cancelled.",
      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio);

    /* Stop the timer. */
    async_socket_io_cancel_time_out(asio);

    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_CANCELLED);
}

/* Reports an I/O failure.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  asio - An I/O that has failed. Can be NULL for general failures.
 *  failure - Failure (errno) that has occurred.
 * Return:
 *  One of AsyncIOAction values.
 */
static AsyncIOAction
_async_socket_io_failure(AsyncSocket* as, AsyncSocketIO* asio, int failure)
{
    T("ASocket %s: %s I/O %p has failed: %d -> %s",
      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE", asio,
      failure, strerror(failure));

    /* Stop the timer. */
    async_socket_io_cancel_time_out(asio);

    errno = failure;
    return asio->on_io(asio->io_opaque, asio, ASIO_STATE_FAILED);
}

/* Cancels all the active socket readers.
 * Param:
 *  as - Initialized AsyncSocket instance.
 */
static void
_async_socket_cancel_readers(AsyncSocket* as)
{
    while (as->readers_head != NULL) {
        AsyncSocketIO* const to_cancel = _async_socket_pull_first_reader(as);
        /* We ignore action returned from the cancellation callback, since we're
         * in a disconnected state here. */
        _async_socket_cancel_io(as, to_cancel);
        async_socket_io_release(to_cancel);
    }
}

/* Cancels all the active socket writers.
 * Param:
 *  as - Initialized AsyncSocket instance.
 */
static void
_async_socket_cancel_writers(AsyncSocket* as)
{
    while (as->writers_head != NULL) {
        AsyncSocketIO* const to_cancel = _async_socket_pull_first_writer(as);
        /* We ignore action returned from the cancellation callback, since we're
         * in a disconnected state here. */
        _async_socket_cancel_io(as, to_cancel);
        async_socket_io_release(to_cancel);
    }
}

/* Cancels all the I/O on the socket. */
static void
_async_socket_cancel_all_io(AsyncSocket* as)
{
    /* Stop the reconnection timer. */
    loopTimer_stop(as->reconnect_timer);

    /* Stop read / write on the socket. */
    loopIo_dontWantWrite(as->io);
    loopIo_dontWantRead(as->io);

    /* Cancel active readers and writers. */
    _async_socket_cancel_readers(as);
    _async_socket_cancel_writers(as);
}

/* Closes socket handle used by the async socket.
 * Param:
 *  as - Initialized AsyncSocket instance.
 */
static void
_async_socket_close_socket(AsyncSocket* as)
{
    if (as->fd >= 0) {
        T("ASocket %s: Socket handle %d is closed.",
          _async_socket_string(as), as->fd);
        loopIo_free(as->io);
        socket_close(as->fd);
        as->fd = -1;
    }
}

/* Destroys AsyncSocket instance.
 * Param:
 *  as - Initialized AsyncSocket instance.
 */
static void
_async_socket_free(AsyncSocket* as)
{
    if (as != NULL) {
        T("ASocket %s: Socket descriptor is destroyed.", _async_socket_string(as));

        /* Close socket. */
        _async_socket_close_socket(as);

        /* Free allocated resources. */
        if (as->looper != NULL) {
            loopTimer_free(as->reconnect_timer);
            if (as->owns_looper) {
                looper_free(as->looper);
            }
        }
        sock_address_done(&as->address);
        AFREE(as);
    }
}

/* Starts reconnection attempts after connection has been lost.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  to - Milliseconds to wait before reconnection attempt.
 */
static void
_async_socket_reconnect(AsyncSocket* as, int to)
{
    T("ASocket %s: reconnecting in %dms...", _async_socket_string(as), to);

    /* Make sure that no I/O is active, and socket is closed before we
     * reconnect. */
    _async_socket_cancel_all_io(as);

    /* Set the timer for reconnection attempt. */
    loopTimer_startRelative(as->reconnect_timer, to);
}

/********************************************************************************
 *                      Asynchronous Socket callbacks
 *******************************************************************************/

/* A callback that is invoked when socket gets disconnected.
 * Param:
 *  as - Initialized AsyncSocket instance.
 */
static void
_on_async_socket_disconnected(AsyncSocket* as)
{
    /* Save error to restore it for the client's callback. */
    const int save_errno = errno;
    AsyncIOAction action = ASIO_ACTION_ABORT;

    D("ASocket %s: Disconnected.", _async_socket_string(as));

    /* Cancel all the I/O on this socket. */
    _async_socket_cancel_all_io(as);

    /* Close the socket. */
    _async_socket_close_socket(as);

    /* Restore errno, and invoke client's callback. */
    errno = save_errno;
    action = as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);

    if (action == ASIO_ACTION_RETRY) {
        /* Client requested reconnection. */
        _async_socket_reconnect(as, as->reconnect_to);
    }
}

/* A callback that is invoked on socket's I/O failure.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  asio - Descriptor for the failed I/O. Can be NULL for general failures.
 */
static AsyncIOAction
_on_async_socket_failure(AsyncSocket* as, AsyncSocketIO* asio)
{
    D("ASocket %s: %s I/O failure: %d -> %s",
      _async_socket_string(as), asio->is_io_read ? "READ" : "WRITE",
      errno, strerror(errno));

    /* Report the failure. */
    return _async_socket_io_failure(as, asio, errno);
}

/* A callback that is invoked when there is data available to read.
 * Param:
 *  as - Initialized AsyncSocket instance.
 * Return:
 *  0 on success, or -1 on failure. Failure returned from this routine will
 *  skip writes (if awailable) behind this read.
 */
static int
_on_async_socket_recv(AsyncSocket* as)
{
    AsyncIOAction action;

    /* Get current reader. */
    AsyncSocketIO* const asr = as->readers_head;
    if (asr == NULL) {
        D("ASocket %s: No reader is available.", _async_socket_string(as));
        loopIo_dontWantRead(as->io);
        return 0;
    }

    /* Reference the reader while we're working with it in this callback. */
    async_socket_io_reference(asr);

    /* Bump I/O state, and inform the client that I/O is in progress. */
    if (asr->state == ASIO_STATE_QUEUED) {
        asr->state = ASIO_STATE_STARTED;
    } else {
        asr->state = ASIO_STATE_CONTINUES;
    }
    action = asr->on_io(asr->io_opaque, asr, asr->state);
    if (action == ASIO_ACTION_ABORT) {
        D("ASocket %s: Read is aborted by the client.", _async_socket_string(as));
        /* Move on to the next reader. */
        _async_socket_advance_reader(as);
        /* Lets see if there are still active readers, and enable, or disable
         * read I/O callback accordingly. */
        if (as->readers_head != NULL) {
            loopIo_wantRead(as->io);
        } else {
            loopIo_dontWantRead(as->io);
        }
        async_socket_io_release(asr);
        return 0;
    }

    /* Read next chunk of data. */
    int res = HANDLE_EINTR(
            socket_recv(as->fd,
                        asr->buffer + asr->transferred,
                        asr->to_transfer - asr->transferred));
    if (res == 0) {
        /* Socket has been disconnected. */
        errno = ECONNRESET;
        _on_async_socket_disconnected(as);
        async_socket_io_release(asr);
        return -1;
    }

    if (res < 0) {
        if (errno == EWOULDBLOCK || errno == EAGAIN) {
            /* Yield to writes behind this read. */
            loopIo_wantRead(as->io);
            async_socket_io_release(asr);
            return 0;
        }

        /* An I/O error. */
        action = _on_async_socket_failure(as, asr);
        if (action != ASIO_ACTION_RETRY) {
            D("ASocket %s: Read is aborted on failure.", _async_socket_string(as));
            /* Move on to the next reader. */
            _async_socket_advance_reader(as);
            /* Lets see if there are still active readers, and enable, or disable
             * read I/O callback accordingly. */
            if (as->readers_head != NULL) {
                loopIo_wantRead(as->io);
            } else {
                loopIo_dontWantRead(as->io);
            }
        }
        async_socket_io_release(asr);
        return -1;
    }

    /* Update the reader's descriptor. */
    asr->transferred += res;
    if (asr->transferred == asr->to_transfer) {
        /* This read is completed. Move on to the next reader. */
        _async_socket_advance_reader(as);

        /* Notify reader completion. */
        _async_socket_complete_io(as, asr);
    }

    /* Lets see if there are still active readers, and enable, or disable read
     * I/O callback accordingly. */
    if (as->readers_head != NULL) {
        loopIo_wantRead(as->io);
    } else {
        loopIo_dontWantRead(as->io);
    }

    async_socket_io_release(asr);

    return 0;
}

/* A callback that is invoked when there is data available to write.
 * Param:
 *  as - Initialized AsyncSocket instance.
 * Return:
 *  0 on success, or -1 on failure. Failure returned from this routine will
 *  skip reads (if awailable) behind this write.
 */
static int
_on_async_socket_send(AsyncSocket* as)
{
    AsyncIOAction action;

    /* Get current writer. */
    AsyncSocketIO* const asw = as->writers_head;
    if (asw == NULL) {
        D("ASocket %s: No writer is available.", _async_socket_string(as));
        loopIo_dontWantWrite(as->io);
        return 0;
    }

    /* Reference the writer while we're working with it in this callback. */
    async_socket_io_reference(asw);

    /* Bump I/O state, and inform the client that I/O is in progress. */
    if (asw->state == ASIO_STATE_QUEUED) {
        asw->state = ASIO_STATE_STARTED;
    } else {
        asw->state = ASIO_STATE_CONTINUES;
    }
    action = asw->on_io(asw->io_opaque, asw, asw->state);
    if (action == ASIO_ACTION_ABORT) {
        D("ASocket %s: Write is aborted by the client.", _async_socket_string(as));
        /* Move on to the next writer. */
        _async_socket_advance_writer(as);
        /* Lets see if there are still active writers, and enable, or disable
         * write I/O callback accordingly. */
        if (as->writers_head != NULL) {
            loopIo_wantWrite(as->io);
        } else {
            loopIo_dontWantWrite(as->io);
        }
        async_socket_io_release(asw);
        return 0;
    }

    /* Write next chunk of data. */
    int res = HANDLE_EINTR(
            socket_send(as->fd,
                        asw->buffer + asw->transferred,
                        asw->to_transfer - asw->transferred));
    if (res == 0) {
        /* Socket has been disconnected. */
        errno = ECONNRESET;
        _on_async_socket_disconnected(as);
        async_socket_io_release(asw);
        return -1;
    }

    if (res < 0) {
        if (errno == EWOULDBLOCK || errno == EAGAIN) {
            /* Yield to reads behind this write. */
            loopIo_wantWrite(as->io);
            async_socket_io_release(asw);
            return 0;
        }

        /* An I/O error. */
        action = _on_async_socket_failure(as, asw);
        if (action != ASIO_ACTION_RETRY) {
            D("ASocket %s: Write is aborted on failure.", _async_socket_string(as));
            /* Move on to the next writer. */
            _async_socket_advance_writer(as);
            /* Lets see if there are still active writers, and enable, or disable
             * write I/O callback accordingly. */
            if (as->writers_head != NULL) {
                loopIo_wantWrite(as->io);
            } else {
                loopIo_dontWantWrite(as->io);
            }
        }
        async_socket_io_release(asw);
        return -1;
    }

    /* Update the writer descriptor. */
    asw->transferred += res;
    if (asw->transferred == asw->to_transfer) {
        /* This write is completed. Move on to the next writer. */
        _async_socket_advance_writer(as);

        /* Notify writer completion. */
        _async_socket_complete_io(as, asw);
    }

    /* Lets see if there are still active writers, and enable, or disable write
     * I/O callback accordingly. */
    if (as->writers_head != NULL) {
        loopIo_wantWrite(as->io);
    } else {
        loopIo_dontWantWrite(as->io);
    }

    async_socket_io_release(asw);

    return 0;
}

/* A callback that is invoked when an I/O is available on socket.
 * Param:
 *  as - Initialized AsyncSocket instance.
 *  fd - Socket's file descriptor.
 *  events - LOOP_IO_READ | LOOP_IO_WRITE bitmask.
 */
static void
_on_async_socket_io(void* opaque, int fd, unsigned events)
{
    AsyncSocket* const as = (AsyncSocket*)opaque;

    /* Reference the socket while we're working with it in this callback. */
    async_socket_reference(as);

    if ((events & LOOP_IO_READ) != 0) {
        if (_on_async_socket_recv(as) != 0) {
            async_socket_release(as);
            return;
        }
    }

    if ((events & LOOP_IO_WRITE) != 0) {
        if (_on_async_socket_send(as) != 0) {
            async_socket_release(as);
            return;
        }
    }

    async_socket_release(as);
}

/* A callback that is invoked by asynchronous socket connector on connection
 *  events.
 * Param:
 *  opaque - Initialized AsyncSocket instance.
 *  connector - Connector that is used to connect this socket.
 *  event - Connection event.
 * Return:
 *  One of AsyncIOAction values.
 */
static AsyncIOAction
_on_connector_events(void* opaque,
                     AsyncSocketConnector* connector,
                     AsyncIOState event)
{
    AsyncIOAction action;
    AsyncSocket* const as = (AsyncSocket*)opaque;

    /* Reference the socket while we're working with it in this callback. */
    async_socket_reference(as);

    if (event == ASIO_STATE_SUCCEEDED) {
        /* Accept the connection. */
        as->fd = async_socket_connector_pull_fd(connector);
        as->io = loopIo_new(as->looper, as->fd, _on_async_socket_io, as);
    }

    /* Invoke client's callback. */
    action = as->on_connection(as->client_opaque, as, event);
    if (event == ASIO_STATE_SUCCEEDED && action != ASIO_ACTION_DONE) {
        /* For whatever reason the client didn't want to keep this connection.
         * Close it. */
        D("ASocket %s: Connection is discarded by the client.",
          _async_socket_string(as));
        _async_socket_close_socket(as);
    }

    if (action != ASIO_ACTION_RETRY) {
        async_socket_connector_release(connector);
    }

    async_socket_release(as);

    return action;
}

/* Timer callback invoked to reconnect the lost connection.
 * Param:
 *  as - Initialized AsyncSocket instance.
 */
void
_on_async_socket_reconnect(void* opaque, LoopTimer* timer)
{
    AsyncSocket* as = (AsyncSocket*)opaque;

    /* Reference the socket while we're working with it in this callback. */
    async_socket_reference(as);
    async_socket_connect(as, as->reconnect_to);
    async_socket_release(as);
}


/********************************************************************************
 *                  Android Device Socket public API
 *******************************************************************************/

AsyncSocket*
async_socket_new(int port,
                 int reconnect_to,
                 on_as_connection_cb client_cb,
                 void* client_opaque,
                 Looper* looper)
{
    AsyncSocket* as;

    if (client_cb == NULL) {
        E("Invalid client_cb parameter");
        return NULL;
    }

    ANEW0(as);

    as->fd = -1;
    as->client_opaque = client_opaque;
    as->on_connection = client_cb;
    as->readers_head = as->readers_tail = NULL;
    as->reconnect_to = reconnect_to;
    as->ref_count = 1;
    sock_address_init_inet(&as->address, SOCK_ADDRESS_INET_LOOPBACK, port);
    if (looper == NULL) {
        as->looper = looper_newGeneric();
        if (as->looper == NULL) {
            E("Unable to create I/O looper for async socket '%s'",
              _async_socket_string(as));
            client_cb(client_opaque, as, ASIO_STATE_FAILED);
            _async_socket_free(as);
            return NULL;
        }
        as->owns_looper = 1;
    } else {
        as->looper = looper;
        as->owns_looper = 0;
    }

    as->reconnect_timer = loopTimer_new(
            as->looper, _on_async_socket_reconnect, as);

    T("ASocket %s: Descriptor is created.", _async_socket_string(as));

    return as;
}

int
async_socket_reference(AsyncSocket* as)
{
    assert(as->ref_count > 0);
    as->ref_count++;
    return as->ref_count;
}

int
async_socket_release(AsyncSocket* as)
{
    assert(as->ref_count > 0);
    as->ref_count--;
    if (as->ref_count == 0) {
        /* Last reference has been dropped. Destroy this object. */
        _async_socket_cancel_all_io(as);
        _async_socket_free(as);
        return 0;
    }
    return as->ref_count;
}

void
async_socket_connect(AsyncSocket* as, int retry_to)
{
    T("ASocket %s: Handling connection request for %dms...",
      _async_socket_string(as), retry_to);

    AsyncSocketConnector* const connector =
        async_socket_connector_new(&as->address, retry_to, _on_connector_events,
                                   as, as->looper);
    if (connector != NULL) {
        async_socket_connector_connect(connector);
    } else {
        as->on_connection(as->client_opaque, as, ASIO_STATE_FAILED);
    }
}

void
async_socket_disconnect(AsyncSocket* as)
{
    T("ASocket %s: Handling disconnection request...", _async_socket_string(as));

    if (as != NULL) {
        _async_socket_cancel_all_io(as);
        _async_socket_close_socket(as);
    }
}

void
async_socket_reconnect(AsyncSocket* as, int retry_to)
{
    T("ASocket %s: Handling reconnection request for %dms...",
      _async_socket_string(as), retry_to);

    _async_socket_cancel_all_io(as);
    _async_socket_close_socket(as);
    _async_socket_reconnect(as, retry_to);
}

void
async_socket_read_abs(AsyncSocket* as,
                      void* buffer, uint32_t len,
                      on_as_io_cb reader_cb,
                      void* reader_opaque,
                      Duration deadline)
{
    T("ASocket %s: Handling read for %d bytes with deadline %lld...",
      _async_socket_string(as), len, deadline);

    AsyncSocketIO* const asr =
        _async_socket_reader_new(as, buffer, len, reader_cb, reader_opaque,
                                 deadline);
    if (async_socket_is_connected(as)) {
        /* Add new reader to the list. Note that we use initial reference from I/O
         * 'new' routine as "in the list" reference counter. */
        if (as->readers_head == NULL) {
            as->readers_head = as->readers_tail = asr;
        } else {
            as->readers_tail->next = asr;
            as->readers_tail = asr;
        }
        loopIo_wantRead(as->io);
    } else {
        D("ASocket %s: Read on a disconnected socket.", _async_socket_string(as));
        errno = ECONNRESET;
        reader_cb(reader_opaque, asr, ASIO_STATE_FAILED);
        async_socket_io_release(asr);
    }
}

void
async_socket_read_rel(AsyncSocket* as,
                      void* buffer, uint32_t len,
                      on_as_io_cb reader_cb,
                      void* reader_opaque,
                      int to)
{
    const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
                                    DURATION_INFINITE;
    async_socket_read_abs(as, buffer, len, reader_cb, reader_opaque, dl);
}

void
async_socket_write_abs(AsyncSocket* as,
                       const void* buffer, uint32_t len,
                       on_as_io_cb writer_cb,
                       void* writer_opaque,
                       Duration deadline)
{
    T("ASocket %s: Handling write for %d bytes with deadline %lld...",
      _async_socket_string(as), len, deadline);

    AsyncSocketIO* const asw =
        _async_socket_writer_new(as, buffer, len, writer_cb, writer_opaque,
                                 deadline);
    if (async_socket_is_connected(as)) {
        /* Add new writer to the list. Note that we use initial reference from I/O
         * 'new' routine as "in the list" reference counter. */
        if (as->writers_head == NULL) {
            as->writers_head = as->writers_tail = asw;
        } else {
            as->writers_tail->next = asw;
            as->writers_tail = asw;
        }
        loopIo_wantWrite(as->io);
    } else {
        D("ASocket %s: Write on a disconnected socket.", _async_socket_string(as));
        errno = ECONNRESET;
        writer_cb(writer_opaque, asw, ASIO_STATE_FAILED);
        async_socket_io_release(asw);
    }
}

void
async_socket_write_rel(AsyncSocket* as,
                       const void* buffer, uint32_t len,
                       on_as_io_cb writer_cb,
                       void* writer_opaque,
                       int to)
{
    const Duration dl = (to >= 0) ? looper_now(_async_socket_get_looper(as)) + to :
                                    DURATION_INFINITE;
    async_socket_write_abs(as, buffer, len, writer_cb, writer_opaque, dl);
}

void*
async_socket_get_client_opaque(const AsyncSocket* as)
{
    return as->client_opaque;
}

Duration
async_socket_deadline(AsyncSocket* as, int rel)
{
    return (rel >= 0) ? looper_now(_async_socket_get_looper(as)) + rel :
                        DURATION_INFINITE;
}

int
async_socket_get_port(const AsyncSocket* as)
{
    return sock_address_get_port(&as->address);
}

int
async_socket_is_connected(const AsyncSocket* as)
{
    return as->fd >= 0;
}
