blob: eb1f2205928ffc1613e6c56ca393ca79a1e58ffa [file] [log] [blame]
/*
* Copyright (C) 2012 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Encapsulates exchange protocol between the emulator, and an Android device
* that is connected to the host via USB. The communication is established over
* a TCP port forwarding, enabled by ADB.
*/
#include "android/utils/debug.h"
#include "android/async-socket-connector.h"
#include "android/async-socket.h"
#include "android/sdk-controller-socket.h"
#include "utils/panic.h"
#include "android/iolooper.h"
#define E(...) derror(__VA_ARGS__)
#define W(...) dwarning(__VA_ARGS__)
#define D(...) VERBOSE_PRINT(sdkctlsocket,__VA_ARGS__)
#define D_ACTIVE VERBOSE_CHECK(sdkctlsocket)
#define TRACE_ON 0
#if TRACE_ON
#define T(...) VERBOSE_PRINT(sdkctlsocket,__VA_ARGS__)
#else
#define T(...)
#endif
/* Recycling memory descriptor. */
typedef struct SDKCtlRecycled SDKCtlRecycled;
struct SDKCtlRecycled {
union {
/* Next recycled descriptor (while listed in recycler). */
SDKCtlRecycled* next;
/* Allocated memory size (while outside of the recycler). */
uint32_t size;
};
};
/*
* Types of the data packets sent via SDK controller socket.
*/
/* The packet is a message. */
#define SDKCTL_PACKET_MESSAGE 1
/* The packet is a query. */
#define SDKCTL_PACKET_QUERY 2
/* The packet is a response to a query. */
#define SDKCTL_PACKET_QUERY_RESPONSE 3
/*
* Types of intenal port messages sent via SDK controller socket.
*/
/* Port is connected.
* This message is sent by SDK controller when the service connects a socket with
* a port that provides requested emulation functionality.
*/
#define SDKCTL_MSG_PORT_CONNECTED -1
/* Port is disconnected.
* This message is sent by SDK controller when a port that provides requested
* emulation functionality disconnects from the socket.
*/
#define SDKCTL_MSG_PORT_DISCONNECTED -2
/* Port is enabled.
* This message is sent by SDK controller when a port that provides requested
* emulation functionality is ready to do the emulation.
*/
#define SDKCTL_MSG_PORT_ENABLED -3
/* Port is disabled.
* This message is sent by SDK controller when a port that provides requested
* emulation functionality is not ready to do the emulation.
*/
#define SDKCTL_MSG_PORT_DISABLED -4
/*
* Types of internal queries sent via SDK controller socket.
*/
/* Handshake query.
* This query is sent to SDK controller service as part of the connection
* protocol implementation.
*/
#define SDKCTL_QUERY_HANDSHAKE -1
/********************************************************************************
* SDKCtlPacket declarations
*******************************************************************************/
/* Packet signature value ('SDKC'). */
static const int _sdkctl_packet_sig = 0x53444B43;
/* Data packet descriptor.
*
* All packets, sent and received via SDK controller socket begin with this
* header, with packet data immediately following this header.
*/
typedef struct SDKCtlPacketHeader {
/* Signature. */
int signature;
/* Total size of the data to transfer with this packet, including this
* header. The transferring data should immediatelly follow this header. */
int size;
/* Encodes packet type. See SDKCTL_PACKET_XXX for the list of packet types
* used by SDK controller. */
int type;
} SDKCtlPacketHeader;
/* Packet descriptor, allocated by this API for data packets to be sent to SDK
* controller.
*
* When packet descriptors are allocated by this API, they are allocated large
* enough to contain this header, and packet data to send to the service,
* immediately following this descriptor.
*/
typedef struct SDKCtlPacket {
/* Supports recycling. Don't put anything in front: recycler expects this
* to be the first field in recyclable descriptor. */
SDKCtlRecycled recycling;
/* SDK controller socket that transmits this packet. */
SDKCtlSocket* sdkctl;
/* Number of outstanding references to the packet. */
int ref_count;
/* Common packet header. Packet data immediately follows this header, so it
* must be the last field in SDKCtlPacket descriptor. */
SDKCtlPacketHeader header;
} SDKCtlPacket;
/********************************************************************************
* SDKCtlDirectPacket declarations
*******************************************************************************/
/* Direct packet descriptor, allocated by this API for direct data packets to be
* sent to SDK controller service on the device.
*
* Direct packet (unlike SDKCtlPacket) don't contain data buffer, but rather
* reference data allocated by the client. This is useful when client sends large
* amount of data (such as framebuffer updates sent my multi-touch port), and
* regular packet descriptors for such large transfer cannot be obtained from the
* recycler.
*/
struct SDKCtlDirectPacket {
/* Supports recycling. Don't put anything in front: recycler expects this
* to be the first field in recyclable descriptor. */
SDKCtlRecycled recycling;
/* SDKCtlSocket that owns this packet. */
SDKCtlSocket* sdkctl;
/* Packet to send. */
SDKCtlPacketHeader* packet;
/* Callback to invoke on packet transmission events. */
on_sdkctl_direct_cb on_sent;
/* An opaque pointer to pass to on_sent callback. */
void* on_sent_opaque;
/* Number of outstanding references to the packet. */
int ref_count;
};
/********************************************************************************
* SDKCtlQuery declarations
*******************************************************************************/
/* Query packet descriptor.
*
* All queries, sent and received via SDK controller socket begin with this
* header, with query data immediately following this header.
*/
typedef struct SDKCtlQueryHeader {
/* Data packet header for this query. */
SDKCtlPacketHeader packet;
/* A unique query identifier. This ID is used to track the query in the
* asynchronous environment in whcih SDK controller socket operates. */
int query_id;
/* Query type. */
int query_type;
} SDKCtlQueryHeader;
/* Query descriptor, allocated by this API for queries to be sent to SDK
* controller service on the device.
*
* When query descriptors are allocated by this API, they are allocated large
* enough to contain this header, and query data to send to the service,
* immediately following this descriptor.
*/
struct SDKCtlQuery {
/* Supports recycling. Don't put anything in front: recycler expects this
* to be the first field in recyclable descriptor. */
SDKCtlRecycled recycling;
/* Next query in the list of active queries. */
SDKCtlQuery* next;
/* A timer to run time out on this query after it has been sent. */
LoopTimer timer[1];
/* Absolute time for this query's deadline. This is the value that query's
* timer is set to after query has been transmitted to the service. */
Duration deadline;
/* SDK controller socket that owns the query. */
SDKCtlSocket* sdkctl;
/* A callback to invoke on query state changes. */
on_sdkctl_query_cb query_cb;
/* An opaque pointer associated with this query. */
void* query_opaque;
/* Points to an address of a buffer where to save query response. */
void** response_buffer;
/* Points to a variable containing size of the response buffer (on the way
* in), or actual query response size (when query is completed). */
uint32_t* response_size;
/* Internal response buffer, allocated if query creator didn't provide its
* own. This field is valid only if response_buffer field is NULL, or is
* pointing to this field. */
void* internal_resp_buffer;
/* Internal response buffer size used if query creator didn't provide its
* own. This field is valid only if response_size field is NULL, or is
* pointing to this field. */
uint32_t internal_resp_size;
/* Number of outstanding references to the query. */
int ref_count;
/* Common query header. Query data immediately follows this header, so it
* must be last field in SDKCtlQuery descriptor. */
SDKCtlQueryHeader header;
};
/* Query reply descriptor.
*
* All replies to a query, sent and received via SDK controller socket begin with
* this header, with query reply data immediately following this header.
*/
typedef struct SDKCtlQueryReplyHeader {
/* Data packet header for this reply. */
SDKCtlPacketHeader packet;
/* An identifier for the query that is addressed with this reply. */
int query_id;
} SDKCtlQueryReplyHeader;
/********************************************************************************
* SDKCtlMessage declarations
*******************************************************************************/
/* Message packet descriptor.
*
* All messages, sent and received via SDK controller socket begin with this
* header, with message data immediately following this header.
*/
typedef struct SDKCtlMessageHeader {
/* Data packet header for this query. */
SDKCtlPacketHeader packet;
/* Message type. */
int msg_type;
} SDKCtlMessageHeader;
/* Message packet descriptor.
*
* All messages, sent and received via SDK controller socket begin with this
* header, with message data immediately following this header.
*/
struct SDKCtlMessage {
/* Data packet descriptor for this message. */
SDKCtlPacket packet;
/* Message type. */
int msg_type;
};
/********************************************************************************
* SDK Control Socket declarations
*******************************************************************************/
/* Enumerates SDKCtlSocket states. */
typedef enum SDKCtlSocketState {
/* Socket is disconnected from SDK controller. */
SDKCTL_SOCKET_DISCONNECTED,
/* Connection to SDK controller is in progress. */
SDKCTL_SOCKET_CONNECTING,
/* Socket is connected to an SDK controller service. */
SDKCTL_SOCKET_CONNECTED
} SDKCtlSocketState;
/* Enumerates SDKCtlSocket I/O dispatcher states. */
typedef enum SDKCtlIODispatcherState {
/* I/O dispatcher expects a packet header. */
SDKCTL_IODISP_EXPECT_HEADER,
/* I/O dispatcher expects packet data. */
SDKCTL_IODISP_EXPECT_DATA,
/* I/O dispatcher expects query response header. */
SDKCTL_IODISP_EXPECT_QUERY_REPLY_HEADER,
/* I/O dispatcher expects query response data. */
SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA,
} SDKCtlIODispatcherState;
/* SDKCtlSocket I/O dispatcher descriptor. */
typedef struct SDKCtlIODispatcher {
/* SDKCtlSocket instance for this dispatcher. */
SDKCtlSocket* sdkctl;
/* Dispatcher state. */
SDKCtlIODispatcherState state;
/* Unites all types of headers used in SDK controller data exchange. */
union {
/* Common packet header. */
SDKCtlPacketHeader packet_header;
/* Header for a query packet. */
SDKCtlQueryHeader query_header;
/* Header for a message packet. */
SDKCtlMessageHeader message_header;
/* Header for a query response packet. */
SDKCtlQueryReplyHeader query_reply_header;
};
/* Descriptor of a packet that is being received from SDK controller. */
SDKCtlPacket* packet;
/* A query for which a reply is currently being received. */
SDKCtlQuery* current_query;
} SDKCtlIODispatcher;
/* SDK controller socket descriptor. */
struct SDKCtlSocket {
/* SDK controller socket state */
SDKCtlSocketState state;
/* SDK controller port status */
SdkCtlPortStatus port_status;
/* I/O dispatcher for the socket. */
SDKCtlIODispatcher io_dispatcher;
/* Asynchronous socket connected to SDK Controller on the device. */
AsyncSocket* as;
/* Client callback that monitors this socket connection. */
on_sdkctl_socket_connection_cb on_socket_connection;
/* Client callback that monitors SDK controller prt connection. */
on_sdkctl_port_connection_cb on_port_connection;
/* A callback to invoke when a message is received from the SDK controller. */
on_sdkctl_message_cb on_message;
/* An opaque pointer associated with this socket. */
void* opaque;
/* Name of an SDK controller port this socket is connected to. */
char* service_name;
/* I/O looper for timers. */
Looper* looper;
/* Head of the active query list. */
SDKCtlQuery* query_head;
/* Tail of the active query list. */
SDKCtlQuery* query_tail;
/* Query ID generator that gets incremented for each new query. */
int next_query_id;
/* Timeout before trying to reconnect after disconnection. */
int reconnect_to;
/* Number of outstanding references to this descriptor. */
int ref_count;
/* Head of the recycled memory */
SDKCtlRecycled* recycler;
/* Recyclable block size. */
uint32_t recycler_block_size;
/* Maximum number of blocks to recycle. */
int recycler_max;
/* Number of blocs in the recycler. */
int recycler_count;
};
/********************************************************************************
* SDKCtlSocket recycling management
*******************************************************************************/
/* Gets a recycled block for a given SDKCtlSocket, or allocates new memory
* block. */
static void*
_sdkctl_socket_alloc_recycler(SDKCtlSocket* sdkctl, uint32_t size)
{
SDKCtlRecycled* block = NULL;
if (sdkctl->recycler != NULL && size <= sdkctl->recycler_block_size) {
assert(sdkctl->recycler_count > 0);
/* There are blocks in the recycler, and requested size fits. */
block = sdkctl->recycler;
sdkctl->recycler = block->next;
block->size = sdkctl->recycler_block_size;
sdkctl->recycler_count--;
} else if (size <= sdkctl->recycler_block_size) {
/* There are no blocks in the recycler, but requested size fits. Lets
* allocate block that we can later recycle. */
block = malloc(sdkctl->recycler_block_size);
if (block == NULL) {
APANIC("SDKCtl %s: Unable to allocate %d bytes block.",
sdkctl->service_name, sdkctl->recycler_block_size);
}
block->size = sdkctl->recycler_block_size;
} else {
/* Requested size doesn't fit the recycler. */
block = malloc(size);
if (block == NULL) {
APANIC("SDKCtl %s: Unable to allocate %d bytes block",
sdkctl->service_name, size);
}
block->size = size;
}
return block;
}
/* Recycles, or frees a block of memory for a given SDKCtlSocket. */
static void
_sdkctl_socket_free_recycler(SDKCtlSocket* sdkctl, void* mem)
{
SDKCtlRecycled* const block = (SDKCtlRecycled*)mem;
if (block->size != sdkctl->recycler_block_size ||
sdkctl->recycler_count == sdkctl->recycler_max) {
/* Recycler is full, or block cannot be recycled. Just free the memory. */
free(mem);
} else {
/* Add that block to the recycler. */
assert(sdkctl->recycler_count >= 0);
block->next = sdkctl->recycler;
sdkctl->recycler = block;
sdkctl->recycler_count++;
}
}
/* Empties the recycler for a given SDKCtlSocket. */
static void
_sdkctl_socket_empty_recycler(SDKCtlSocket* sdkctl)
{
SDKCtlRecycled* block = sdkctl->recycler;
while (block != NULL) {
void* const to_free = block;
block = block->next;
free(to_free);
}
sdkctl->recycler = NULL;
sdkctl->recycler_count = 0;
}
/********************************************************************************
* SDKCtlSocket query list management
*******************************************************************************/
/* Adds a query to the list of active queries.
* Param:
* sdkctl - SDKCtlSocket instance for the query.
* query - Query to add to the list.
*/
static void
_sdkctl_socket_add_query(SDKCtlQuery* query)
{
SDKCtlSocket* const sdkctl = query->sdkctl;
if (sdkctl->query_head == NULL) {
assert(sdkctl->query_tail == NULL);
sdkctl->query_head = sdkctl->query_tail = query;
} else {
sdkctl->query_tail->next = query;
sdkctl->query_tail = query;
}
/* Keep the query referenced while it's in the list. */
sdkctl_query_reference(query);
}
/* Removes a query from the list of active queries.
* Param:
* query - Query to remove from the list of active queries. If query has been
* removed from the list, it will be dereferenced to offset the reference
* that wad made when the query has been added to the list.
* Return:
* Boolean: 1 if query has been removed, or 0 if query has not been found in the
* list of active queries.
*/
static int
_sdkctl_socket_remove_query(SDKCtlQuery* query)
{
SDKCtlSocket* const sdkctl = query->sdkctl;
SDKCtlQuery* prev = NULL;
SDKCtlQuery* head = sdkctl->query_head;
/* Quick check: the query could be currently handled by the dispatcher. */
if (sdkctl->io_dispatcher.current_query == query) {
/* Release the query from dispatcher. */
sdkctl->io_dispatcher.current_query = NULL;
sdkctl_query_release(query);
return 1;
}
/* Remove query from the list. */
while (head != NULL && query != head) {
prev = head;
head = head->next;
}
if (head == NULL) {
D("SDKCtl %s: Query %p is not found in the list.",
sdkctl->service_name, query);
return 0;
}
if (prev == NULL) {
/* Query is at the head of the list. */
assert(query == sdkctl->query_head);
sdkctl->query_head = query->next;
} else {
/* Query is in the middle / at the end of the list. */
assert(query != sdkctl->query_head);
prev->next = query->next;
}
if (sdkctl->query_tail == query) {
/* Query is at the tail of the list. */
assert(query->next == NULL);
sdkctl->query_tail = prev;
}
query->next = NULL;
/* Release query that is now removed from the list. Note that query
* passed to this routine should hold an extra reference, owned by the
* caller. */
sdkctl_query_release(query);
return 1;
}
/* Removes a query (based on query ID) from the list of active queries.
* Param:
* sdkctl - SDKCtlSocket instance that owns the query.
* query_id - Identifies the query to remove.
* Return:
* A query removed from the list of active queries, or NULL if query with the
* given ID has not been found in the list. Note that query returned from this
* routine still holds the reference made when the query has been added to the
* list.
*/
static SDKCtlQuery*
_sdkctl_socket_remove_query_id(SDKCtlSocket* sdkctl, int query_id)
{
SDKCtlQuery* query = NULL;
SDKCtlQuery* prev = NULL;
SDKCtlQuery* head = sdkctl->query_head;
/* Quick check: the query could be currently handled by dispatcher. */
if (sdkctl->io_dispatcher.current_query != NULL &&
sdkctl->io_dispatcher.current_query->header.query_id == query_id) {
/* Release the query from dispatcher. */
query = sdkctl->io_dispatcher.current_query;
sdkctl->io_dispatcher.current_query = NULL;
return query;
}
/* Remove query from the list. */
while (head != NULL && head->header.query_id != query_id) {
prev = head;
head = head->next;
}
if (head == NULL) {
D("SDKCtl %s: Query ID %d is not found in the list.",
sdkctl->service_name, query_id);
return NULL;
}
/* Query is found in the list. */
query = head;
if (prev == NULL) {
/* Query is at the head of the list. */
assert(query == sdkctl->query_head);
sdkctl->query_head = query->next;
} else {
/* Query is in the middle, or at the end of the list. */
assert(query != sdkctl->query_head);
prev->next = query->next;
}
if (sdkctl->query_tail == query) {
/* Query is at the tail of the list. */
assert(query->next == NULL);
sdkctl->query_tail = prev;
}
query->next = NULL;
return query;
}
/* Pulls the first query from the list of active queries.
* Param:
* sdkctl - SDKCtlSocket instance that owns the query.
* Return:
* A query removed from the head of the list of active queries, or NULL if query
* list is empty.
*/
static SDKCtlQuery*
_sdkctl_socket_pull_first_query(SDKCtlSocket* sdkctl)
{
SDKCtlQuery* const query = sdkctl->query_head;
if (query != NULL) {
sdkctl->query_head = query->next;
if (sdkctl->query_head == NULL) {
sdkctl->query_tail = NULL;
}
}
return query;
}
/* Generates new query ID for the given SDKCtl. */
static int
_sdkctl_socket_next_query_id(SDKCtlSocket* sdkctl)
{
return ++sdkctl->next_query_id;
}
/********************************************************************************
* SDKCtlPacket implementation
*******************************************************************************/
/* Alocates a packet. */
static SDKCtlPacket*
_sdkctl_packet_new(SDKCtlSocket* sdkctl, uint32_t size, int type)
{
/* Allocate packet descriptor large enough to contain packet data. */
SDKCtlPacket* const packet =
_sdkctl_socket_alloc_recycler(sdkctl, sizeof(SDKCtlPacket) + size);
packet->sdkctl = sdkctl;
packet->ref_count = 1;
packet->header.signature = _sdkctl_packet_sig;
packet->header.size = size;
packet->header.type = type;
/* Refence SDKCTlSocket that owns this packet. */
sdkctl_socket_reference(sdkctl);
T("SDKCtl %s: Packet %p of type %d is allocated for %d bytes transfer.",
sdkctl->service_name, packet, type, size);
return packet;
}
/* Frees a packet. */
static void
_sdkctl_packet_free(SDKCtlPacket* packet)
{
SDKCtlSocket* const sdkctl = packet->sdkctl;
/* Recycle packet. */
_sdkctl_socket_free_recycler(packet->sdkctl, packet);
T("SDKCtl %s: Packet %p is freed.", sdkctl->service_name, packet);
/* Release SDKCTlSocket that owned this packet. */
sdkctl_socket_release(sdkctl);
}
/* References a packet. */
int
_sdkctl_packet_reference(SDKCtlPacket* packet)
{
assert(packet->ref_count > 0);
packet->ref_count++;
return packet->ref_count;
}
/* Releases a packet. */
int
_sdkctl_packet_release(SDKCtlPacket* packet)
{
assert(packet->ref_count > 0);
packet->ref_count--;
if (packet->ref_count == 0) {
/* Last reference has been dropped. Destroy this object. */
_sdkctl_packet_free(packet);
return 0;
}
return packet->ref_count;
}
/* An I/O callback invoked on packet transmission.
* Param:
* io_opaque SDKCtlPacket instance of the packet that's being sent with this I/O.
* asio - Write I/O descriptor.
* status - I/O status.
*/
static AsyncIOAction
_on_sdkctl_packet_send_io(void* io_opaque,
AsyncSocketIO* asio,
AsyncIOState status)
{
SDKCtlPacket* const packet = (SDKCtlPacket*)io_opaque;
AsyncIOAction action = ASIO_ACTION_DONE;
/* Reference the packet while we're in this callback. */
_sdkctl_packet_reference(packet);
/* Lets see what's going on with query transmission. */
switch (status) {
case ASIO_STATE_SUCCEEDED:
/* Packet has been sent to the service. */
T("SDKCtl %s: Packet %p transmission has succeeded.",
packet->sdkctl->service_name, packet);
break;
case ASIO_STATE_CANCELLED:
T("SDKCtl %s: Packet %p is cancelled.",
packet->sdkctl->service_name, packet);
break;
case ASIO_STATE_FAILED:
T("SDKCtl %s: Packet %p has failed: %d -> %s",
packet->sdkctl->service_name, packet, errno, strerror(errno));
break;
case ASIO_STATE_FINISHED:
/* Time to disassociate the packet with the I/O. */
_sdkctl_packet_release(packet);
break;
default:
/* Transitional state. */
break;
}
_sdkctl_packet_release(packet);
return action;
}
/* Transmits a packet to SDK Controller.
* Param:
* packet - Packet to transmit.
*/
static void
_sdkctl_packet_transmit(SDKCtlPacket* packet)
{
assert(packet->header.signature == _sdkctl_packet_sig);
/* Reference to associate with the I/O */
_sdkctl_packet_reference(packet);
/* Transmit the packet to SDK controller. */
async_socket_write_rel(packet->sdkctl->as, &packet->header, packet->header.size,
_on_sdkctl_packet_send_io, packet, -1);
T("SDKCtl %s: Packet %p size %d is being sent.",
packet->sdkctl->service_name, packet, packet->header.size);
}
/********************************************************************************
* SDKCtlDirectPacket implementation
********************************************************************************/
SDKCtlDirectPacket*
sdkctl_direct_packet_new(SDKCtlSocket* sdkctl)
{
SDKCtlDirectPacket* const packet =
_sdkctl_socket_alloc_recycler(sdkctl, sizeof(SDKCtlDirectPacket));
packet->sdkctl = sdkctl;
packet->ref_count = 1;
/* Refence SDKCTlSocket that owns this packet. */
sdkctl_socket_reference(packet->sdkctl);
T("SDKCtl %s: Direct packet %p is allocated.", sdkctl->service_name, packet);
return packet;
}
/* Frees a direct packet. */
static void
_sdkctl_direct_packet_free(SDKCtlDirectPacket* packet)
{
SDKCtlSocket* const sdkctl = packet->sdkctl;
/* Free allocated resources. */
_sdkctl_socket_free_recycler(packet->sdkctl, packet);
T("SDKCtl %s: Direct packet %p is freed.", sdkctl->service_name, packet);
/* Release SDKCTlSocket that owned this packet. */
sdkctl_socket_release(sdkctl);
}
/* References a packet. */
int
sdkctl_direct_packet_reference(SDKCtlDirectPacket* packet)
{
assert(packet->ref_count > 0);
packet->ref_count++;
return packet->ref_count;
}
/* Releases a packet. */
int
sdkctl_direct_packet_release(SDKCtlDirectPacket* packet)
{
assert(packet->ref_count > 0);
packet->ref_count--;
if (packet->ref_count == 0) {
/* Last reference has been dropped. Destroy this object. */
_sdkctl_direct_packet_free(packet);
return 0;
}
return packet->ref_count;
}
/* An I/O callback invoked on direct packet transmission.
* Param:
* io_opaque SDKCtlDirectPacket instance of the packet that's being sent with
* this I/O.
* asio - Write I/O descriptor.
* status - I/O status.
*/
static AsyncIOAction
_on_sdkctl_direct_packet_send_io(void* io_opaque,
AsyncSocketIO* asio,
AsyncIOState status)
{
SDKCtlDirectPacket* const packet = (SDKCtlDirectPacket*)io_opaque;
AsyncIOAction action = ASIO_ACTION_DONE;
/* Reference the packet while we're in this callback. */
sdkctl_direct_packet_reference(packet);
/* Lets see what's going on with query transmission. */
switch (status) {
case ASIO_STATE_SUCCEEDED:
/* Packet has been sent to the service. */
T("SDKCtl %s: Direct packet %p transmission has succeeded.",
packet->sdkctl->service_name, packet);
packet->on_sent(packet->on_sent_opaque, packet, status);
break;
case ASIO_STATE_CANCELLED:
T("SDKCtl %s: Direct packet %p is cancelled.",
packet->sdkctl->service_name, packet);
packet->on_sent(packet->on_sent_opaque, packet, status);
break;
case ASIO_STATE_FAILED:
T("SDKCtl %s: Direct packet %p has failed: %d -> %s",
packet->sdkctl->service_name, packet, errno, strerror(errno));
packet->on_sent(packet->on_sent_opaque, packet, status);
break;
case ASIO_STATE_FINISHED:
/* Time to disassociate with the I/O. */
sdkctl_direct_packet_release(packet);
break;
default:
/* Transitional state. */
break;
}
sdkctl_direct_packet_release(packet);
return action;
}
void
sdkctl_direct_packet_send(SDKCtlDirectPacket* packet,
void* data,
on_sdkctl_direct_cb cb,
void* cb_opaque)
{
packet->packet = (SDKCtlPacketHeader*)data;
packet->on_sent = cb;
packet->on_sent_opaque = cb_opaque;
assert(packet->packet->signature == _sdkctl_packet_sig);
/* Reference for I/O */
sdkctl_direct_packet_reference(packet);
/* Transmit the packet to SDK controller. */
async_socket_write_rel(packet->sdkctl->as, packet->packet, packet->packet->size,
_on_sdkctl_direct_packet_send_io, packet, -1);
T("SDKCtl %s: Direct packet %p size %d is being sent",
packet->sdkctl->service_name, packet, packet->packet->size);
}
/********************************************************************************
* SDKCtlMessage implementation
*******************************************************************************/
/* Alocates a message descriptor. */
static SDKCtlMessage*
_sdkctl_message_new(SDKCtlSocket* sdkctl, uint32_t msg_size, int msg_type)
{
SDKCtlMessage* const msg =
(SDKCtlMessage*)_sdkctl_packet_new(sdkctl,
sizeof(SDKCtlMessageHeader) + msg_size,
SDKCTL_PACKET_MESSAGE);
msg->msg_type = msg_type;
return msg;
}
int
sdkctl_message_reference(SDKCtlMessage* msg)
{
return _sdkctl_packet_reference(&msg->packet);
}
int
sdkctl_message_release(SDKCtlMessage* msg)
{
return _sdkctl_packet_release(&msg->packet);
}
SDKCtlMessage*
sdkctl_message_send(SDKCtlSocket* sdkctl,
int msg_type,
const void* data,
uint32_t size)
{
SDKCtlMessage* const msg = _sdkctl_message_new(sdkctl, size, msg_type);
if (size != 0 && data != NULL) {
memcpy(msg + 1, data, size);
}
_sdkctl_packet_transmit(&msg->packet);
return msg;
}
int
sdkctl_message_get_header_size(void)
{
return sizeof(SDKCtlMessageHeader);
}
void
sdkctl_init_message_header(void* msg, int msg_type, int msg_size)
{
SDKCtlMessageHeader* const msg_header = (SDKCtlMessageHeader*)msg;
msg_header->packet.signature = _sdkctl_packet_sig;
msg_header->packet.size = sizeof(SDKCtlMessageHeader) + msg_size;
msg_header->packet.type = SDKCTL_PACKET_MESSAGE;
msg_header->msg_type = msg_type;
}
/********************************************************************************
* SDKCtlQuery implementation
*******************************************************************************/
/* Frees query descriptor. */
static void
_sdkctl_query_free(SDKCtlQuery* query)
{
if (query != NULL) {
SDKCtlSocket* const sdkctl = query->sdkctl;
if (query->internal_resp_buffer != NULL &&
(query->response_buffer == NULL ||
query->response_buffer == &query->internal_resp_buffer)) {
/* This query used its internal buffer to receive the response.
* Free it. */
free(query->internal_resp_buffer);
}
loopTimer_done(query->timer);
/* Recyle the descriptor. */
_sdkctl_socket_free_recycler(sdkctl, query);
T("SDKCtl %s: Query %p is freed.", sdkctl->service_name, query);
/* Release socket that owned this query. */
sdkctl_socket_release(sdkctl);
}
}
/* Cancels timeout for the query.
*
* For the simplicity of implementation, the dispatcher will cancel query timer
* when query response data begins to flow in. If we let the timer to expire at
* that stage, we will end up with data flowing in without real place to
* accomodate it.
*/
static void
_sdkctl_query_cancel_timeout(SDKCtlQuery* query)
{
loopTimer_stop(query->timer);
T("SDKCtl %s: Query %p ID %d deadline %lld is cancelled.",
query->sdkctl->service_name, query, query->header.query_id, query->deadline);
}
/*
* Query I/O callbacks.
*/
/* Callback that is invoked by the I/O dispatcher when query is successfuly
* completed (i.e. response to the query is received).
*/
static void
_on_sdkctl_query_completed(SDKCtlQuery* query)
{
T("SDKCtl %s: Query %p ID %d is completed.",
query->sdkctl->service_name, query, query->header.query_id);
/* Cancel deadline, and inform the client about query completion. */
_sdkctl_query_cancel_timeout(query);
query->query_cb(query->query_opaque, query, ASIO_STATE_SUCCEEDED);
}
/* A callback that is invoked on query cancellation. */
static void
_on_sdkctl_query_cancelled(SDKCtlQuery* query)
{
/*
* Query cancellation means that SDK controller is disconnected. In turn,
* this means that SDK controller socket will handle disconnection in its
* connection callback. So, at this point all we need to do here is to inform
* the client about query cancellation.
*/
/* Cancel deadline, and inform the client about query cancellation. */
_sdkctl_query_cancel_timeout(query);
query->query_cb(query->query_opaque, query, ASIO_STATE_CANCELLED);
}
/* A timer callback that is invoked on query timeout.
* Param:
* opaque - SDKCtlQuery instance.
*/
static void
_on_skdctl_query_timeout(void* opaque)
{
SDKCtlQuery* const query = (SDKCtlQuery*)opaque;
D("SDKCtl %s: Query %p ID %d with deadline %lld has timed out at %lld",
query->sdkctl->service_name, query, query->header.query_id,
query->deadline, async_socket_deadline(query->sdkctl->as, 0));
/* Reference the query while we're in this callback. */
sdkctl_query_reference(query);
/* Inform the client about deadline expiration. Note that client may
* extend the deadline, and retry the query. */
const AsyncIOAction action =
query->query_cb(query->query_opaque, query, ASIO_STATE_TIMED_OUT);
/* For actions other than retry we will destroy the query. */
if (action != ASIO_ACTION_RETRY) {
_sdkctl_socket_remove_query(query);
}
sdkctl_query_release(query);
}
/* A callback that is invoked when query has been sent to the SDK controller. */
static void
_on_sdkctl_query_sent(SDKCtlQuery* query)
{
T("SDKCtl %s: Sent %d bytes of query %p ID %d of type %d",
query->sdkctl->service_name, query->header.packet.size, query,
query->header.query_id, query->header.query_type);
/* Inform the client about the event. */
query->query_cb(query->query_opaque, query, ASIO_STATE_CONTINUES);
/* Set a timer to expire at query's deadline, and let the response to come
* through the dispatcher loop. */
loopTimer_startAbsolute(query->timer, query->deadline);
}
/* An I/O callback invoked on query transmission.
* Param:
* io_opaque SDKCtlQuery instance of the query that's being sent with this I/O.
* asio - Write I/O descriptor.
* status - I/O status.
*/
static AsyncIOAction
_on_sdkctl_query_send_io(void* io_opaque,
AsyncSocketIO* asio,
AsyncIOState status)
{
SDKCtlQuery* const query = (SDKCtlQuery*)io_opaque;
AsyncIOAction action = ASIO_ACTION_DONE;
/* Reference the query while we're in this callback. */
sdkctl_query_reference(query);
/* Lets see what's going on with query transmission. */
switch (status) {
case ASIO_STATE_SUCCEEDED:
/* Query has been sent to the service. */
_on_sdkctl_query_sent(query);
break;
case ASIO_STATE_CANCELLED:
T("SDKCtl %s: Query %p ID %d is cancelled in transmission.",
query->sdkctl->service_name, query, query->header.query_id);
/* Remove the query from the list of active queries. */
_sdkctl_socket_remove_query(query);
_on_sdkctl_query_cancelled(query);
break;
case ASIO_STATE_TIMED_OUT:
D("SDKCtl %s: Query %p ID %d with deadline %lld has timed out in transmission at %lld",
query->sdkctl->service_name, query, query->header.query_id,
query->deadline, async_socket_deadline(query->sdkctl->as, 0));
/* Invoke query's callback. */
action = query->query_cb(query->query_opaque, query, status);
/* For actions other than retry we need to stop the query. */
if (action != ASIO_ACTION_RETRY) {
_sdkctl_socket_remove_query(query);
}
break;
case ASIO_STATE_FAILED:
T("SDKCtl %s: Query %p ID %d failed in transmission: %d -> %s",
query->sdkctl->service_name, query, query->header.query_id,
errno, strerror(errno));
/* Invoke query's callback. Note that we will let the client to
* decide what to do on I/O failure. */
action = query->query_cb(query->query_opaque, query, status);
/* For actions other than retry we need to stop the query. */
if (action != ASIO_ACTION_RETRY) {
_sdkctl_socket_remove_query(query);
}
break;
case ASIO_STATE_FINISHED:
/* Time to disassociate with the I/O. */
sdkctl_query_release(query);
break;
default:
/* Transitional state. */
break;
}
sdkctl_query_release(query);
return action;
}
/********************************************************************************
* SDKCtlQuery public API implementation
********************************************************************************/
SDKCtlQuery*
sdkctl_query_new(SDKCtlSocket* sdkctl, int query_type, uint32_t in_data_size)
{
SDKCtlQuery* const query =
_sdkctl_socket_alloc_recycler(sdkctl, sizeof(SDKCtlQuery) + in_data_size);
query->next = NULL;
query->sdkctl = sdkctl;
query->response_buffer = NULL;
query->response_size = NULL;
query->internal_resp_buffer = NULL;
query->internal_resp_size = 0;
query->query_cb = NULL;
query->query_opaque = NULL;
query->deadline = DURATION_INFINITE;
query->ref_count = 1;
query->header.packet.signature = _sdkctl_packet_sig;
query->header.packet.size = sizeof(SDKCtlQueryHeader) + in_data_size;
query->header.packet.type = SDKCTL_PACKET_QUERY;
query->header.query_id = _sdkctl_socket_next_query_id(sdkctl);
query->header.query_type = query_type;
/* Initialize timer to fire up on query deadline expiration. */
loopTimer_init(query->timer, sdkctl->looper, _on_skdctl_query_timeout, query);
/* Reference socket that owns this query. */
sdkctl_socket_reference(sdkctl);
T("SDKCtl %s: Query %p ID %d type %d is created for %d bytes of data.",
query->sdkctl->service_name, query, query->header.query_id,
query_type, in_data_size);
return query;
}
SDKCtlQuery*
sdkctl_query_new_ex(SDKCtlSocket* sdkctl,
int query_type,
uint32_t in_data_size,
const void* in_data,
void** response_buffer,
uint32_t* response_size,
on_sdkctl_query_cb query_cb,
void* query_opaque)
{
SDKCtlQuery* const query = sdkctl_query_new(sdkctl, query_type, in_data_size);
query->response_buffer = response_buffer;
if (query->response_buffer == NULL) {
/* Creator didn't supply a buffer. Use internal one instead. */
query->response_buffer = &query->internal_resp_buffer;
}
query->response_size = response_size;
if (query->response_size == NULL) {
/* Creator didn't supply a buffer for response size. Use internal one
* instead. */
query->response_size = &query->internal_resp_size;
}
query->query_cb = query_cb;
query->query_opaque = query_opaque;
/* Init query's input buffer. */
if (in_data_size != 0 && in_data != NULL) {
memcpy(query + 1, in_data, in_data_size);
}
return query;
}
void
sdkctl_query_send(SDKCtlQuery* query, int to)
{
SDKCtlSocket* const sdkctl = query->sdkctl;
/* Initialize the deadline. */
query->deadline = async_socket_deadline(query->sdkctl->as, to);
/* List the query in the list of active queries. */
_sdkctl_socket_add_query(query);
/* Reference query associated with write I/O. */
sdkctl_query_reference(query);
assert(query->header.packet.signature == _sdkctl_packet_sig);
/* Transmit the query to SDK controller. */
async_socket_write_abs(sdkctl->as, &query->header, query->header.packet.size,
_on_sdkctl_query_send_io, query, query->deadline);
T("SDKCtl %s: Query %p ID %d type %d is being sent with deadline at %lld",
query->sdkctl->service_name, query, query->header.query_id,
query->header.query_type, query->deadline);
}
SDKCtlQuery*
sdkctl_query_build_and_send(SDKCtlSocket* sdkctl,
int query_type,
uint32_t in_data_size,
const void* in_data,
void** response_buffer,
uint32_t* response_size,
on_sdkctl_query_cb query_cb,
void* query_opaque,
int to)
{
SDKCtlQuery* const query =
sdkctl_query_new_ex(sdkctl, query_type, in_data_size, in_data,
response_buffer, response_size, query_cb,
query_opaque);
sdkctl_query_send(query, to);
return query;
}
int
sdkctl_query_reference(SDKCtlQuery* query)
{
assert(query->ref_count > 0);
query->ref_count++;
return query->ref_count;
}
int
sdkctl_query_release(SDKCtlQuery* query)
{
assert(query->ref_count > 0);
query->ref_count--;
if (query->ref_count == 0) {
/* Last reference has been dropped. Destroy this object. */
_sdkctl_query_free(query);
return 0;
}
return query->ref_count;
}
void*
sdkctl_query_get_buffer_in(SDKCtlQuery* query)
{
/* Query buffer starts right after the header. */
return query + 1;
}
void*
sdkctl_query_get_buffer_out(SDKCtlQuery* query)
{
return query->response_buffer != NULL ? *query->response_buffer :
query->internal_resp_buffer;
}
/********************************************************************************
* SDKCtlPacket implementation
*******************************************************************************/
/* A packet has been received from SDK controller.
* Note that we expect the packet to be a message, since queries, and query
* replies are handled separately. */
static void
_on_sdkctl_packet_received(SDKCtlSocket* sdkctl, SDKCtlPacket* packet)
{
T("SDKCtl %s: Received packet size: %d, type: %d",
sdkctl->service_name, packet->header.size, packet->header.type);
assert(packet->header.signature == _sdkctl_packet_sig);
if (packet->header.type == SDKCTL_PACKET_MESSAGE) {
SDKCtlMessage* const msg = (SDKCtlMessage*)packet;
/* Lets see if this is an internal protocol message. */
switch (msg->msg_type) {
case SDKCTL_MSG_PORT_CONNECTED:
sdkctl->port_status = SDKCTL_PORT_CONNECTED;
sdkctl->on_port_connection(sdkctl->opaque, sdkctl,
SDKCTL_PORT_CONNECTED);
break;
case SDKCTL_MSG_PORT_DISCONNECTED:
sdkctl->port_status = SDKCTL_PORT_DISCONNECTED;
sdkctl->on_port_connection(sdkctl->opaque, sdkctl,
SDKCTL_PORT_DISCONNECTED);
break;
case SDKCTL_MSG_PORT_ENABLED:
sdkctl->port_status = SDKCTL_PORT_ENABLED;
sdkctl->on_port_connection(sdkctl->opaque, sdkctl,
SDKCTL_PORT_ENABLED);
break;
case SDKCTL_MSG_PORT_DISABLED:
sdkctl->port_status = SDKCTL_PORT_DISABLED;
sdkctl->on_port_connection(sdkctl->opaque, sdkctl,
SDKCTL_PORT_DISABLED);
break;
default:
/* This is a higher-level message. Dispatch the message to the
* client. */
sdkctl->on_message(sdkctl->opaque, sdkctl, msg, msg->msg_type, msg + 1,
packet->header.size - sizeof(SDKCtlMessageHeader));
break;
}
} else {
E("SDKCtl %s: Received unknown packet type %d size %d",
sdkctl->service_name, packet->header.type, packet->header.size);
}
}
/********************************************************************************
* SDKCtlIODispatcher implementation
*******************************************************************************/
/* An I/O callback invoked when data gets received from the socket.
* Param:
* io_opaque SDKCtlIODispatcher instance associated with the reader.
* asio - Read I/O descriptor.
* status - I/O status.
*/
static AsyncIOAction _on_sdkctl_io_dispatcher_io(void* io_opaque,
AsyncSocketIO* asio,
AsyncIOState status);
/* Starts I/O dispatcher for SDK controller socket. */
static void
_sdkctl_io_dispatcher_start(SDKCtlSocket* sdkctl) {
SDKCtlIODispatcher* const dispatcher = &sdkctl->io_dispatcher;
dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER;
dispatcher->sdkctl = sdkctl;
dispatcher->packet = NULL;
dispatcher->current_query = NULL;
/* Register a packet header reader with the socket. */
async_socket_read_rel(dispatcher->sdkctl->as, &dispatcher->packet_header,
sizeof(SDKCtlPacketHeader), _on_sdkctl_io_dispatcher_io,
dispatcher, -1);
}
/* Resets I/O dispatcher for SDK controller socket. */
static void
_sdkctl_io_dispatcher_reset(SDKCtlSocket* sdkctl) {
SDKCtlIODispatcher* const dispatcher = &sdkctl->io_dispatcher;
/* Cancel current query. */
if (dispatcher->current_query != NULL) {
SDKCtlQuery* const query = dispatcher->current_query;
dispatcher->current_query = NULL;
_on_sdkctl_query_cancelled(query);
sdkctl_query_release(query);
}
/* Free packet data buffer. */
if (dispatcher->packet != NULL) {
_sdkctl_packet_release(dispatcher->packet);
dispatcher->packet = NULL;
}
/* Reset dispatcher state. */
dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER;
T("SDKCtl %s: I/O Dispatcher is reset", sdkctl->service_name);
}
/*
* I/O dispatcher callbacks.
*/
/* A callback that is invoked when a failure occurred while dispatcher was
* reading data from the socket.
*/
static void
_on_io_dispatcher_io_failure(SDKCtlIODispatcher* dispatcher,
AsyncSocketIO* asio)
{
SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
D("SDKCtl %s: Dispatcher I/O failure: %d -> %s",
sdkctl->service_name, errno, strerror(errno));
/* We treat all I/O failures same way we treat disconnection. Just cancel
* everything, disconnect, and let the client to decide what to do next. */
sdkctl_socket_disconnect(sdkctl);
/* Report disconnection to the client, and let it restore connection in this
* callback. */
sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED);
}
/* A callback that is invoked when dispatcher's reader has been cancelled. */
static void
_on_io_dispatcher_io_cancelled(SDKCtlIODispatcher* dispatcher,
AsyncSocketIO* asio)
{
T("SDKCtl %s: Dispatcher I/O cancelled.", dispatcher->sdkctl->service_name);
/* If we're in the middle of receiving query reply we need to cancel the
* query. */
if (dispatcher->current_query != NULL) {
SDKCtlQuery* const query = dispatcher->current_query;
dispatcher->current_query = NULL;
_on_sdkctl_query_cancelled(query);
sdkctl_query_release(query);
}
/* Discard packet data we've received so far. */
if (dispatcher->packet != NULL) {
_sdkctl_packet_release(dispatcher->packet);
dispatcher->packet = NULL;
}
}
/* A generic packet header has been received by I/O dispatcher. */
static AsyncIOAction
_on_io_dispatcher_packet_header(SDKCtlIODispatcher* dispatcher,
AsyncSocketIO* asio)
{
SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
T("SDKCtl %s: Packet header type %d, size %d is received.",
dispatcher->sdkctl->service_name, dispatcher->packet_header.type,
dispatcher->packet_header.size);
/* Make sure we have a valid packet header. */
if (dispatcher->packet_header.signature != _sdkctl_packet_sig) {
E("SDKCtl %s: Invalid packet signature %x for packet type %d, size %d",
sdkctl->service_name, dispatcher->packet_header.signature,
dispatcher->packet_header.type, dispatcher->packet_header.size);
/* This is a protocol failure. Treat it as I/O failure: disconnect, and
* let the client to decide what to do next. */
errno = EINVAL;
_on_io_dispatcher_io_failure(dispatcher, asio);
return ASIO_ACTION_DONE;
}
/* Here we have three choices for the packet, that define the rest of
* the data that follow it:
* - Regular packet,
* - Response to a query that has been sent to SDK controller,
* - A query from SDK controller.
* Update the state accordingly, and initiate reading of the
* remaining of the packet.
*/
if (dispatcher->packet_header.type == SDKCTL_PACKET_QUERY_RESPONSE) {
/* This is a response to the query. Before receiving response data we
* need to locate the relevant query, and use its response buffer to read
* the data. For that we need to obtain query ID firts. So, initiate
* reading of the remaining part of SDKCtlQueryReplyHeader. */
dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_HEADER;
async_socket_read_rel(sdkctl->as, &dispatcher->query_reply_header.query_id,
sizeof(SDKCtlQueryReplyHeader) - sizeof(SDKCtlPacketHeader),
_on_sdkctl_io_dispatcher_io, dispatcher, -1);
} else {
/* For regular packets, as well as queries, we simply allocate buffer,
* that fits the entire packet, and read the remainder of the data in
* there. */
dispatcher->state = SDKCTL_IODISP_EXPECT_DATA;
dispatcher->packet =
_sdkctl_packet_new(sdkctl, dispatcher->packet_header.size,
dispatcher->packet_header.type);
/* Initiate reading of the packet data. */
async_socket_read_rel(sdkctl->as, dispatcher->packet + 1,
dispatcher->packet_header.size - sizeof(SDKCtlPacketHeader),
_on_sdkctl_io_dispatcher_io, dispatcher, -1);
}
return ASIO_ACTION_DONE;
}
/* A generic packet has been received by I/O dispatcher. */
static AsyncIOAction
_on_io_dispatcher_packet(SDKCtlIODispatcher* dispatcher, AsyncSocketIO* asio)
{
SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
SDKCtlPacket* const packet = dispatcher->packet;
dispatcher->packet = NULL;
T("SDKCtl %s: Packet type %d, size %d is received.",
dispatcher->sdkctl->service_name, dispatcher->packet_header.type,
dispatcher->packet_header.size);
_on_sdkctl_packet_received(sdkctl, packet);
_sdkctl_packet_release(packet);
/* Get ready for the next I/O cycle. */
dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER;
async_socket_read_rel(sdkctl->as, &dispatcher->packet_header, sizeof(SDKCtlPacketHeader),
_on_sdkctl_io_dispatcher_io, dispatcher, -1);
return ASIO_ACTION_DONE;
}
/* A query reply header has been received by I/O dispatcher. */
static AsyncIOAction
_on_io_dispatcher_query_reply_header(SDKCtlIODispatcher* dispatcher,
AsyncSocketIO* asio)
{
SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
SDKCtlQuery* query;
T("SDKCtl %s: Query reply header is received for query ID %d",
dispatcher->sdkctl->service_name, dispatcher->query_reply_header.query_id);
/* Pull the query out of the list of active queries. It's the dispatcher that
* owns this query now. */
dispatcher->current_query =
_sdkctl_socket_remove_query_id(sdkctl, dispatcher->query_reply_header.query_id);
query = dispatcher->current_query;
const uint32_t query_data_size =
dispatcher->packet_header.size - sizeof(SDKCtlQueryReplyHeader);
dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA;
if (query == NULL) {
D("%s: Query #%d is not found by dispatcher",
dispatcher->sdkctl->service_name, dispatcher->query_reply_header.query_id);
/* Query is not found. Just read the remainder of reply up in the air,
* and then discard when it's over. */
dispatcher->state = SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA;
dispatcher->packet =
_sdkctl_packet_new(sdkctl, dispatcher->packet_header.size,
dispatcher->packet_header.type);
/* Copy query reply info to the packet. */
memcpy(&dispatcher->packet->header, &dispatcher->query_reply_header,
sizeof(SDKCtlQueryReplyHeader));
async_socket_read_rel(sdkctl->as, dispatcher->packet + 1, query_data_size,
_on_sdkctl_io_dispatcher_io, dispatcher, -1);
} else {
/* Prepare to receive query reply. For the simplicity sake, cancel query
* time out, so it doesn't expire on us while we're in the middle of
* receiving query's reply. */
_sdkctl_query_cancel_timeout(query);
if (*query->response_size < query_data_size) {
*query->response_buffer = malloc(query_data_size);
if (*query->response_buffer == NULL) {
APANIC("%s: Unable to allocate %d bytes for query response",
sdkctl->service_name, query_data_size);
}
}
/* Save the actual query response size. */
*query->response_size = query_data_size;
/* Start reading query response. */
async_socket_read_rel(sdkctl->as, *query->response_buffer,
*query->response_size, _on_sdkctl_io_dispatcher_io,
dispatcher, -1);
}
return ASIO_ACTION_DONE;
}
/* A query reply header has been received by I/O dispatcher. */
static AsyncIOAction
_on_io_dispatcher_query_reply(SDKCtlIODispatcher* dispatcher, AsyncSocketIO* asio)
{
SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
SDKCtlQuery* const query = dispatcher->current_query;
dispatcher->current_query = NULL;
if (query != NULL) {
_ANDROID_ASSERT(query->header.query_id == dispatcher->query_reply_header.query_id,
"SDKCtl %s: Query ID mismatch in I/O dispatcher",
sdkctl->service_name);
T("SDKCtl %s: Query reply is received for query %p ID %d. Reply size is %d",
dispatcher->sdkctl->service_name, query, query->header.query_id,
*query->response_size);
/* Complete the query, and release it from the dispatcher. */
_on_sdkctl_query_completed(query);
sdkctl_query_release(query);
} else {
/* This was "read up in the air" for a cancelled query. Just discard the
* read data. */
if (dispatcher->packet != NULL) {
_sdkctl_packet_release(dispatcher->packet);
dispatcher->packet = NULL;
}
}
/* Get ready for the next I/O cycle. */
dispatcher->state = SDKCTL_IODISP_EXPECT_HEADER;
async_socket_read_rel(sdkctl->as, &dispatcher->packet_header, sizeof(SDKCtlPacketHeader),
_on_sdkctl_io_dispatcher_io, dispatcher, -1);
return ASIO_ACTION_DONE;
}
/* An I/O callback invoked when data gets received from the socket.
* This is main I/O dispatcher loop.
* Param:
* io_opaque SDKCtlIODispatcher instance associated with the reader.
* asio - Read I/O descriptor.
* status - I/O status.
*/
static AsyncIOAction
_on_sdkctl_io_dispatcher_io(void* io_opaque,
AsyncSocketIO* asio,
AsyncIOState status)
{
AsyncIOAction action = ASIO_ACTION_DONE;
SDKCtlIODispatcher* const dispatcher = (SDKCtlIODispatcher*)io_opaque;
SDKCtlSocket* const sdkctl = dispatcher->sdkctl;
/* Reference SDKCtlSocket while we're in this callback. */
sdkctl_socket_reference(sdkctl);
if (status != ASIO_STATE_SUCCEEDED) {
/* Something going on with I/O other than receiving data.. */
switch (status) {
case ASIO_STATE_STARTED:
/* Data has started flowing in. Cancel timeout on I/O that has
* started, so we can complete the current state of the
* dispatcher without interruptions other than I/O failures. */
async_socket_io_cancel_time_out(asio);
break;
case ASIO_STATE_FAILED:
/* I/O failure has occurred. Handle the failure. */
_on_io_dispatcher_io_failure(dispatcher, asio);
break;
case ASIO_STATE_TIMED_OUT:
/* The way I/O dispatcher is implemented, this should never
* happen, because dispatcher doesn't set I/O expiration time
* when registering its readers. */
_ANDROID_ASSERT(0,
"SDKCtl %s: We should never receive ASIO_STATE_TIMED_OUT in SDKCtl I/O dispatcher.",
sdkctl->service_name);
break;
case ASIO_STATE_CANCELLED:
/* Cancellation means that we're in the middle of handling
* disconnection. Sooner or later, this dispatcher will be reset,
* so we don't really care about keeping its state at this point.
*/
_on_io_dispatcher_io_cancelled(dispatcher, asio);
break;
case ASIO_STATE_FINISHED:
break;
default:
_ANDROID_ASSERT(0, "SDKCtl %s: Unexpected I/O status %d in the dispatcher",
sdkctl->service_name, status);
/* Handle this as protocol failure. */
errno = EINVAL;
_on_io_dispatcher_io_failure(dispatcher, asio);
action = ASIO_ACTION_ABORT;
break;
}
sdkctl_socket_release(sdkctl);
return action;
}
/* Requested data has been read. Handle the chunk depending on dispatcher's
* state. */
switch (dispatcher->state) {
case SDKCTL_IODISP_EXPECT_HEADER:
/* A generic packet header is received. */
action = _on_io_dispatcher_packet_header(dispatcher, asio);
break;
case SDKCTL_IODISP_EXPECT_QUERY_REPLY_HEADER:
/* Query reply header is received. */
action = _on_io_dispatcher_query_reply_header(dispatcher, asio);
break;
case SDKCTL_IODISP_EXPECT_QUERY_REPLY_DATA:
/* Query reply is received. Complete the query. */
action = _on_io_dispatcher_query_reply(dispatcher, asio);
break;
case SDKCTL_IODISP_EXPECT_DATA:
/* A generic packet is received. */
action = _on_io_dispatcher_packet(dispatcher, asio);
break;
default:
_ANDROID_ASSERT(0, "SDKCtl %s: Unexpected I/O dispacher state %d",
sdkctl->service_name, dispatcher->state);
break;
}
sdkctl_socket_release(sdkctl);
return action;
}
/********************************************************************************
* SDKCtlSocket internals.
*******************************************************************************/
/* Cancels all queries that is active on this socket. */
static void
_sdkctl_socket_cancel_all_queries(SDKCtlSocket* sdkctl)
{
SDKCtlIODispatcher* const dispatcher = &sdkctl->io_dispatcher;
SDKCtlQuery* query;
/* Cancel query that is being completed in dispatcher. */
if (dispatcher->current_query != NULL) {
SDKCtlQuery* const query = dispatcher->current_query;
dispatcher->current_query = NULL;
_on_sdkctl_query_cancelled(query);
sdkctl_query_release(query);
}
/* One by one empty query list cancelling pulled queries. */
query = _sdkctl_socket_pull_first_query(sdkctl);
while (query != NULL) {
_sdkctl_query_cancel_timeout(query);
query->query_cb(query->query_opaque, query, ASIO_STATE_CANCELLED);
sdkctl_query_release(query);
query = _sdkctl_socket_pull_first_query(sdkctl);
}
}
/* Cancels all packets that is active on this socket. */
static void
_sdkctl_socket_cancel_all_packets(SDKCtlSocket* sdkctl)
{
}
/* Cancels all I/O that is active on this socket. */
static void
_sdkctl_socket_cancel_all_io(SDKCtlSocket* sdkctl)
{
/* Cancel all queries, and packets that are active for this I/O. */
_sdkctl_socket_cancel_all_queries(sdkctl);
_sdkctl_socket_cancel_all_packets(sdkctl);
}
/* Disconnects AsyncSocket for SDKCtlSocket. */
static void
_sdkctl_socket_disconnect_socket(SDKCtlSocket* sdkctl)
{
if (sdkctl->as != NULL) {
/* Disconnect the socket. This will trigger I/O cancellation callbacks. */
async_socket_disconnect(sdkctl->as);
/* Cancel all I/O that is active on this socket. */
_sdkctl_socket_cancel_all_io(sdkctl);
/* Reset I/O dispatcher. */
_sdkctl_io_dispatcher_reset(sdkctl);
}
sdkctl->state = SDKCTL_SOCKET_DISCONNECTED;
sdkctl->port_status = SDKCTL_PORT_DISCONNECTED;
}
/* Frees SDKCtlSocket instance. */
static void
_sdkctl_socket_free(SDKCtlSocket* sdkctl)
{
if (sdkctl != NULL) {
T("SDKCtl %s: descriptor is destroing.", sdkctl->service_name);
/* Disconnect, and release the socket. */
if (sdkctl->as != NULL) {
async_socket_disconnect(sdkctl->as);
async_socket_release(sdkctl->as);
}
/* Free allocated resources. */
if (sdkctl->looper != NULL) {
looper_free(sdkctl->looper);
}
if (sdkctl->service_name != NULL) {
free(sdkctl->service_name);
}
_sdkctl_socket_empty_recycler(sdkctl);
AFREE(sdkctl);
}
}
/********************************************************************************
* SDK Control Socket connection callbacks.
*******************************************************************************/
/* Initiates handshake query when SDK controller socket is connected. */
static void _sdkctl_do_handshake(SDKCtlSocket* sdkctl);
/* A socket connection is established.
* Here we will start I/O dispatcher, and will initiate a handshake with
* the SdkController service for this socket. */
static AsyncIOAction
_on_async_socket_connected(SDKCtlSocket* sdkctl)
{
D("SDKCtl %s: Socket is connected.", sdkctl->service_name);
/* Notify the client that connection is established. */
const AsyncIOAction action =
sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, ASIO_STATE_SUCCEEDED);
if (action == ASIO_ACTION_DONE) {
/* Initialize, and start main I/O dispatcher. */
_sdkctl_io_dispatcher_start(sdkctl);
/* Initiate handshake. */
_sdkctl_do_handshake(sdkctl);
return action;
} else {
/* Client didn't like something about this connection. */
return action;
}
}
/* Handles lost connection with SdkController service. */
static AsyncIOAction
_on_async_socket_disconnected(SDKCtlSocket* sdkctl)
{
D("SDKCtl %s: Socket has been disconnected.", sdkctl->service_name);
_sdkctl_socket_disconnect_socket(sdkctl);
AsyncIOAction action = sdkctl->on_socket_connection(sdkctl->opaque, sdkctl,
ASIO_STATE_FAILED);
if (action == ASIO_ACTION_DONE) {
/* Default action for disconnect is to reestablish the connection. */
action = ASIO_ACTION_RETRY;
}
if (action == ASIO_ACTION_RETRY) {
sdkctl->state = SDKCTL_SOCKET_CONNECTING;
}
return action;
}
/* An entry point for all socket connection events.
* Here we will dispatch connection events to appropriate handlers.
* Param:
* client_opaque - SDKCtlSocket isntance.
*/
static AsyncIOAction
_on_async_socket_connection(void* client_opaque,
AsyncSocket* as,
AsyncIOState status)
{
AsyncIOAction action = ASIO_ACTION_DONE;
SDKCtlSocket* const sdkctl = (SDKCtlSocket*)client_opaque;
/* Reference the socket while in this callback. */
sdkctl_socket_reference(sdkctl);
switch (status) {
case ASIO_STATE_SUCCEEDED:
sdkctl->state = SDKCTL_SOCKET_CONNECTED;
_on_async_socket_connected(sdkctl);
break;
case ASIO_STATE_FAILED:
if (sdkctl->state == SDKCTL_SOCKET_CONNECTED) {
/* This is disconnection condition. */
action = _on_async_socket_disconnected(sdkctl);
} else {
/* An error has occurred while attempting to connect to socket.
* Lets try again... */
action = ASIO_ACTION_RETRY;
}
break;
case ASIO_STATE_RETRYING:
default:
action = ASIO_ACTION_RETRY;
break;
}
sdkctl_socket_release(sdkctl);
return action;
}
/********************************************************************************
* SDK Control Socket public API
*******************************************************************************/
SDKCtlSocket*
sdkctl_socket_new(int reconnect_to,
const char* service_name,
on_sdkctl_socket_connection_cb on_socket_connection,
on_sdkctl_port_connection_cb on_port_connection,
on_sdkctl_message_cb on_message,
void* opaque)
{
SDKCtlSocket* sdkctl;
ANEW0(sdkctl);
sdkctl->state = SDKCTL_SOCKET_DISCONNECTED;
sdkctl->port_status = SDKCTL_PORT_DISCONNECTED;
sdkctl->opaque = opaque;
sdkctl->service_name = ASTRDUP(service_name);
sdkctl->on_socket_connection = on_socket_connection;
sdkctl->on_port_connection = on_port_connection;
sdkctl->on_message = on_message;
sdkctl->reconnect_to = reconnect_to;
sdkctl->as = NULL;
sdkctl->next_query_id = 0;
sdkctl->query_head = sdkctl->query_tail = NULL;
sdkctl->ref_count = 1;
sdkctl->recycler = NULL;
sdkctl->recycler_block_size = 0;
sdkctl->recycler_max = 0;
sdkctl->recycler_count = 0;
T("SDKCtl %s: descriptor is created.", sdkctl->service_name);
sdkctl->looper = looper_newCore();
if (sdkctl->looper == NULL) {
E("Unable to create I/O looper for SDKCtl socket '%s'",
service_name);
on_socket_connection(opaque, sdkctl, ASIO_STATE_FAILED);
_sdkctl_socket_free(sdkctl);
return NULL;
}
return sdkctl;
}
int sdkctl_socket_reference(SDKCtlSocket* sdkctl)
{
assert(sdkctl->ref_count > 0);
sdkctl->ref_count++;
return sdkctl->ref_count;
}
int
sdkctl_socket_release(SDKCtlSocket* sdkctl)
{
assert(sdkctl->ref_count > 0);
sdkctl->ref_count--;
if (sdkctl->ref_count == 0) {
/* Last reference has been dropped. Destroy this object. */
_sdkctl_socket_free(sdkctl);
return 0;
}
return sdkctl->ref_count;
}
void
sdkctl_init_recycler(SDKCtlSocket* sdkctl,
uint32_t data_size,
int max_recycled_num)
{
if (sdkctl->recycler != NULL) {
D("SDKCtl %s: Recycler is already initialized. Ignoring recycler init.",
sdkctl->service_name);
return;
}
/* SDKCtlQuery is max descriptor sizeof. */
data_size += sizeof(SDKCtlQuery);
sdkctl->recycler_block_size = data_size;
sdkctl->recycler_max = max_recycled_num;
sdkctl->recycler_count = 0;
}
void
sdkctl_socket_connect(SDKCtlSocket* sdkctl, int port, int retry_to)
{
T("SDKCtl %s: Handling connect request to port %d, retrying in %dms...",
sdkctl->service_name, port, retry_to);
sdkctl->state = SDKCTL_SOCKET_CONNECTING;
sdkctl->as = async_socket_new(port, sdkctl->reconnect_to,
_on_async_socket_connection, sdkctl,
sdkctl->looper);
if (sdkctl->as == NULL) {
E("Unable to allocate AsyncSocket for SDKCtl socket '%s'",
sdkctl->service_name);
sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED);
} else {
async_socket_connect(sdkctl->as, retry_to);
}
}
void
sdkctl_socket_reconnect(SDKCtlSocket* sdkctl, int port, int retry_to)
{
T("SDKCtl %s: Handling reconnection request to port %d, retrying in %dms...",
sdkctl->service_name, port, retry_to);
_sdkctl_socket_disconnect_socket(sdkctl);
if (sdkctl->as == NULL) {
sdkctl_socket_connect(sdkctl, port, retry_to);
} else {
sdkctl->state = SDKCTL_SOCKET_CONNECTING;
async_socket_reconnect(sdkctl->as, retry_to);
}
}
void
sdkctl_socket_disconnect(SDKCtlSocket* sdkctl)
{
T("SDKCtl %s: Handling disconnect request.", sdkctl->service_name);
_sdkctl_socket_disconnect_socket(sdkctl);
}
int
sdkctl_socket_is_connected(SDKCtlSocket* sdkctl)
{
return (sdkctl->state == SDKCTL_SOCKET_CONNECTED) ? 1 : 0;
}
int
sdkctl_socket_is_port_ready(SDKCtlSocket* sdkctl)
{
return (sdkctl->port_status == SDKCTL_PORT_ENABLED) ? 1 : 0;
}
SdkCtlPortStatus
sdkctl_socket_get_port_status(SDKCtlSocket* sdkctl)
{
return sdkctl->port_status;
}
int
sdkctl_socket_is_handshake_ok(SDKCtlSocket* sdkctl)
{
switch (sdkctl->port_status) {
case SDKCTL_HANDSHAKE_DUP:
case SDKCTL_HANDSHAKE_UNKNOWN_QUERY:
case SDKCTL_HANDSHAKE_UNKNOWN_RESPONSE:
return 0;
default:
return 1;
}
}
/********************************************************************************
* Handshake query
*******************************************************************************/
/*
* Handshake result values.
*/
/* Handshake has succeeded completed, and service-side port is connected. */
#define SDKCTL_HANDSHAKE_RESP_CONNECTED 0
/* Handshake has succeeded completed, but service-side port is not connected. */
#define SDKCTL_HANDSHAKE_RESP_NOPORT 1
/* Handshake has failed due to duplicate connection request. */
#define SDKCTL_HANDSHAKE_RESP_DUP -1
/* Handshake has failed due to unknown query. */
#define SDKCTL_HANDSHAKE_RESP_QUERY_UNKNOWN -2
/* A callback that is ivoked on handshake I/O events. */
static AsyncIOAction
_on_handshake_io(void* query_opaque,
SDKCtlQuery* query,
AsyncIOState status)
{
SDKCtlSocket* const sdkctl = (SDKCtlSocket*)query_opaque;
if (status == ASIO_STATE_SUCCEEDED) {
const int* res = (const int*)(*query->response_buffer);
SdkCtlPortStatus handshake_status;
switch (*res) {
case SDKCTL_HANDSHAKE_RESP_CONNECTED:
D("SDKCtl %s: Handshake succeeded. Port is connected",
sdkctl->service_name);
handshake_status = SDKCTL_HANDSHAKE_CONNECTED;
break;
case SDKCTL_HANDSHAKE_RESP_NOPORT:
D("SDKCtl %s: Handshake succeeded. Port is not connected",
sdkctl->service_name);
handshake_status = SDKCTL_HANDSHAKE_NO_PORT;
break;
case SDKCTL_HANDSHAKE_RESP_DUP:
D("SDKCtl %s: Handshake failed: duplicate connection.",
sdkctl->service_name);
handshake_status = SDKCTL_HANDSHAKE_DUP;
break;
case SDKCTL_HANDSHAKE_RESP_QUERY_UNKNOWN:
D("SDKCtl %s: Handshake failed: unknown query.",
sdkctl->service_name);
handshake_status = SDKCTL_HANDSHAKE_UNKNOWN_QUERY;
break;
default:
E("SDKCtl %s: Unknown handshake response: %d",
sdkctl->service_name, *res);
handshake_status = SDKCTL_HANDSHAKE_UNKNOWN_RESPONSE;
break;
}
sdkctl->port_status = handshake_status;
sdkctl->on_port_connection(sdkctl->opaque, sdkctl, handshake_status);
} else {
/* Something is going on with the handshake... */
switch (status) {
case ASIO_STATE_FAILED:
case ASIO_STATE_TIMED_OUT:
case ASIO_STATE_CANCELLED:
D("SDKCtl %s: Handshake failed: I/O state %d. Error: %d -> %s",
sdkctl->service_name, status, errno, strerror(errno));
sdkctl->on_socket_connection(sdkctl->opaque, sdkctl,
ASIO_STATE_FAILED);
break;
default:
break;
}
}
return ASIO_ACTION_DONE;
}
static AsyncIOAction
_on_sdkctl_endianness_io(void* io_opaque,
AsyncSocketIO* asio,
AsyncIOState status) {
SDKCtlSocket* const sdkctl = (SDKCtlSocket*)io_opaque;
if (status == ASIO_STATE_SUCCEEDED) {
/* Now it's time to initiate handshake message. */
D("SDKCtl %s: Sending handshake query...", sdkctl->service_name);
SDKCtlQuery* query =
sdkctl_query_build_and_send(sdkctl, SDKCTL_QUERY_HANDSHAKE,
strlen(sdkctl->service_name),
sdkctl->service_name, NULL, NULL,
_on_handshake_io, sdkctl, 3000);
sdkctl_query_release(query);
return ASIO_ACTION_DONE;
} else {
/* Something is going on with the endianness... */
switch (status) {
case ASIO_STATE_FAILED:
case ASIO_STATE_TIMED_OUT:
case ASIO_STATE_CANCELLED:
D("SDKCtl %s: endianness failed: I/O state %d. Error: %d -> %s",
sdkctl->service_name, status, errno, strerror(errno));
sdkctl->on_socket_connection(sdkctl->opaque, sdkctl, ASIO_STATE_FAILED);
break;
default:
break;
}
}
return ASIO_ACTION_DONE;
}
static void
_sdkctl_do_handshake(SDKCtlSocket* sdkctl)
{
#ifndef HOST_WORDS_BIGENDIAN
static const char _host_end = 0;
#else
static const char _host_end = 1;
#endif
D("SDKCtl %s: Sending endianness: %d", sdkctl->service_name, _host_end);
/* Before we can send any structured data to the SDK controller we need to
* report endianness of the host. */
async_socket_write_rel(sdkctl->as, &_host_end, 1,
_on_sdkctl_endianness_io, sdkctl, 3000);
}