Merge remote-tracking branch 'kwolf/for-anthony' into staging
diff --git a/block.c b/block.c
index a8a013a..03a21d8 100644
--- a/block.c
+++ b/block.c
@@ -437,6 +437,33 @@
     return 0;
 }
 
+/**
+ * Set open flags for a given cache mode
+ *
+ * Return 0 on success, -1 if the cache mode was invalid.
+ */
+int bdrv_parse_cache_flags(const char *mode, int *flags)
+{
+    *flags &= ~BDRV_O_CACHE_MASK;
+
+    if (!strcmp(mode, "off") || !strcmp(mode, "none")) {
+        *flags |= BDRV_O_NOCACHE | BDRV_O_CACHE_WB;
+    } else if (!strcmp(mode, "directsync")) {
+        *flags |= BDRV_O_NOCACHE;
+    } else if (!strcmp(mode, "writeback")) {
+        *flags |= BDRV_O_CACHE_WB;
+    } else if (!strcmp(mode, "unsafe")) {
+        *flags |= BDRV_O_CACHE_WB;
+        *flags |= BDRV_O_NO_FLUSH;
+    } else if (!strcmp(mode, "writethrough")) {
+        /* this is the default */
+    } else {
+        return -1;
+    }
+
+    return 0;
+}
+
 /*
  * Common part for opening disk images and files
  */
@@ -1163,8 +1190,8 @@
         return ret;
     }
 
-    /* No flush needed for cache=writethrough, it uses O_DSYNC */
-    if ((bs->open_flags & BDRV_O_CACHE_MASK) != 0) {
+    /* No flush needed for cache modes that use O_DSYNC */
+    if ((bs->open_flags & BDRV_O_CACHE_WB) != 0) {
         bdrv_flush(bs);
     }
 
@@ -1888,11 +1915,19 @@
                         " wr_bytes=%" PRId64
                         " rd_operations=%" PRId64
                         " wr_operations=%" PRId64
+                        " flush_operations=%" PRId64
+                        " wr_total_time_ns=%" PRId64
+                        " rd_total_time_ns=%" PRId64
+                        " flush_total_time_ns=%" PRId64
                         "\n",
                         qdict_get_int(qdict, "rd_bytes"),
                         qdict_get_int(qdict, "wr_bytes"),
                         qdict_get_int(qdict, "rd_operations"),
-                        qdict_get_int(qdict, "wr_operations"));
+                        qdict_get_int(qdict, "wr_operations"),
+                        qdict_get_int(qdict, "flush_operations"),
+                        qdict_get_int(qdict, "wr_total_time_ns"),
+                        qdict_get_int(qdict, "rd_total_time_ns"),
+                        qdict_get_int(qdict, "flush_total_time_ns"));
 }
 
 void bdrv_stats_print(Monitor *mon, const QObject *data)
@@ -1910,12 +1945,22 @@
                              "'wr_bytes': %" PRId64 ","
                              "'rd_operations': %" PRId64 ","
                              "'wr_operations': %" PRId64 ","
-                             "'wr_highest_offset': %" PRId64
+                             "'wr_highest_offset': %" PRId64 ","
+                             "'flush_operations': %" PRId64 ","
+                             "'wr_total_time_ns': %" PRId64 ","
+                             "'rd_total_time_ns': %" PRId64 ","
+                             "'flush_total_time_ns': %" PRId64
                              "} }",
-                             bs->rd_bytes, bs->wr_bytes,
-                             bs->rd_ops, bs->wr_ops,
+                             bs->nr_bytes[BDRV_ACCT_READ],
+                             bs->nr_bytes[BDRV_ACCT_WRITE],
+                             bs->nr_ops[BDRV_ACCT_READ],
+                             bs->nr_ops[BDRV_ACCT_WRITE],
                              bs->wr_highest_sector *
-                             (uint64_t)BDRV_SECTOR_SIZE);
+                             (uint64_t)BDRV_SECTOR_SIZE,
+                             bs->nr_ops[BDRV_ACCT_FLUSH],
+                             bs->total_time_ns[BDRV_ACCT_WRITE],
+                             bs->total_time_ns[BDRV_ACCT_READ],
+                             bs->total_time_ns[BDRV_ACCT_FLUSH]);
     dict  = qobject_to_qdict(res);
 
     if (*bs->device_name) {
@@ -2229,7 +2274,6 @@
     return buf;
 }
 
-
 /**************************************************************/
 /* async I/Os */
 
@@ -2238,7 +2282,6 @@
                                  BlockDriverCompletionFunc *cb, void *opaque)
 {
     BlockDriver *drv = bs->drv;
-    BlockDriverAIOCB *ret;
 
     trace_bdrv_aio_readv(bs, sector_num, nb_sectors, opaque);
 
@@ -2247,16 +2290,8 @@
     if (bdrv_check_request(bs, sector_num, nb_sectors))
         return NULL;
 
-    ret = drv->bdrv_aio_readv(bs, sector_num, qiov, nb_sectors,
-                              cb, opaque);
-
-    if (ret) {
-        /* Update stats even though technically transfer has not happened. */
-        bs->rd_bytes += (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
-        bs->rd_ops++;
-    }
-
-    return ret;
+    return drv->bdrv_aio_readv(bs, sector_num, qiov, nb_sectors,
+                               cb, opaque);
 }
 
 typedef struct BlockCompleteData {
@@ -2323,9 +2358,6 @@
                                cb, opaque);
 
     if (ret) {
-        /* Update stats even though technically transfer has not happened. */
-        bs->wr_bytes += (unsigned) nb_sectors * BDRV_SECTOR_SIZE;
-        bs->wr_ops ++;
         if (bs->wr_highest_sector < sector_num + nb_sectors - 1) {
             bs->wr_highest_sector = sector_num + nb_sectors - 1;
         }
@@ -3133,6 +3165,27 @@
     return bs->in_use;
 }
 
+void
+bdrv_acct_start(BlockDriverState *bs, BlockAcctCookie *cookie, int64_t bytes,
+        enum BlockAcctType type)
+{
+    assert(type < BDRV_MAX_IOTYPE);
+
+    cookie->bytes = bytes;
+    cookie->start_time_ns = get_clock();
+    cookie->type = type;
+}
+
+void
+bdrv_acct_done(BlockDriverState *bs, BlockAcctCookie *cookie)
+{
+    assert(cookie->type < BDRV_MAX_IOTYPE);
+
+    bs->nr_bytes[cookie->type] += cookie->bytes;
+    bs->nr_ops[cookie->type]++;
+    bs->total_time_ns[cookie->type] += get_clock() - cookie->start_time_ns;
+}
+
 int bdrv_img_create(const char *filename, const char *fmt,
                     const char *base_filename, const char *base_fmt,
                     char *options, uint64_t img_size, int flags)
diff --git a/block.h b/block.h
index a3bfaaf..3ac0b94 100644
--- a/block.h
+++ b/block.h
@@ -69,6 +69,7 @@
 BlockDriverState *bdrv_new(const char *device_name);
 void bdrv_make_anon(BlockDriverState *bs);
 void bdrv_delete(BlockDriverState *bs);
+int bdrv_parse_cache_flags(const char *mode, int *flags);
 int bdrv_file_open(BlockDriverState **pbs, const char *filename, int flags);
 int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
               BlockDriver *drv);
@@ -254,6 +255,23 @@
 void bdrv_set_in_use(BlockDriverState *bs, int in_use);
 int bdrv_in_use(BlockDriverState *bs);
 
+enum BlockAcctType {
+    BDRV_ACCT_READ,
+    BDRV_ACCT_WRITE,
+    BDRV_ACCT_FLUSH,
+    BDRV_MAX_IOTYPE,
+};
+
+typedef struct BlockAcctCookie {
+    int64_t bytes;
+    int64_t start_time_ns;
+    enum BlockAcctType type;
+} BlockAcctCookie;
+
+void bdrv_acct_start(BlockDriverState *bs, BlockAcctCookie *cookie,
+        int64_t bytes, enum BlockAcctType type);
+void bdrv_acct_done(BlockDriverState *bs, BlockAcctCookie *cookie);
+
 typedef enum {
     BLKDBG_L1_UPDATE,
 
@@ -306,3 +324,4 @@
 void bdrv_debug_event(BlockDriverState *bs, BlkDebugEvent event);
 
 #endif
+
diff --git a/block/curl.c b/block/curl.c
index 5c157bc..f3f61cc 100644
--- a/block/curl.c
+++ b/block/curl.c
@@ -229,6 +229,23 @@
             {
                 CURLState *state = NULL;
                 curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, (char**)&state);
+
+                /* ACBs for successful messages get completed in curl_read_cb */
+                if (msg->data.result != CURLE_OK) {
+                    int i;
+                    for (i = 0; i < CURL_NUM_ACB; i++) {
+                        CURLAIOCB *acb = state->acb[i];
+
+                        if (acb == NULL) {
+                            continue;
+                        }
+
+                        acb->common.cb(acb->common.opaque, -EIO);
+                        qemu_aio_release(acb);
+                        state->acb[i] = NULL;
+                    }
+                }
+
                 curl_clean_state(state);
                 break;
             }
@@ -277,7 +294,8 @@
     curl_easy_setopt(state->curl, CURLOPT_FOLLOWLOCATION, 1);
     curl_easy_setopt(state->curl, CURLOPT_NOSIGNAL, 1);
     curl_easy_setopt(state->curl, CURLOPT_ERRORBUFFER, state->errmsg);
-    
+    curl_easy_setopt(state->curl, CURLOPT_FAILONERROR, 1);
+
 #ifdef DEBUG_VERBOSE
     curl_easy_setopt(state->curl, CURLOPT_VERBOSE, 1);
 #endif
diff --git a/block/qcow.c b/block/qcow.c
index e155d3c..c8bfecc 100644
--- a/block/qcow.c
+++ b/block/qcow.c
@@ -159,6 +159,8 @@
             goto fail;
         bs->backing_file[len] = '\0';
     }
+
+    qemu_co_mutex_init(&s->lock);
     return 0;
 
  fail:
@@ -190,24 +192,6 @@
         return -1;
     if (AES_set_decrypt_key(keybuf, 128, &s->aes_decrypt_key) != 0)
         return -1;
-#if 0
-    /* test */
-    {
-        uint8_t in[16];
-        uint8_t out[16];
-        uint8_t tmp[16];
-        for(i=0;i<16;i++)
-            in[i] = i;
-        AES_encrypt(in, tmp, &s->aes_encrypt_key);
-        AES_decrypt(tmp, out, &s->aes_decrypt_key);
-        for(i = 0; i < 16; i++)
-            printf(" %02x", tmp[i]);
-        printf("\n");
-        for(i = 0; i < 16; i++)
-            printf(" %02x", out[i]);
-        printf("\n");
-    }
-#endif
     return 0;
 }
 
@@ -441,296 +425,178 @@
     return 0;
 }
 
-#if 0
-
-static int qcow_read(BlockDriverState *bs, int64_t sector_num,
-                     uint8_t *buf, int nb_sectors)
-{
-    BDRVQcowState *s = bs->opaque;
-    int ret, index_in_cluster, n;
-    uint64_t cluster_offset;
-
-    while (nb_sectors > 0) {
-        cluster_offset = get_cluster_offset(bs, sector_num << 9, 0, 0, 0, 0);
-        index_in_cluster = sector_num & (s->cluster_sectors - 1);
-        n = s->cluster_sectors - index_in_cluster;
-        if (n > nb_sectors)
-            n = nb_sectors;
-        if (!cluster_offset) {
-            if (bs->backing_hd) {
-                /* read from the base image */
-                ret = bdrv_read(bs->backing_hd, sector_num, buf, n);
-                if (ret < 0)
-                    return -1;
-            } else {
-                memset(buf, 0, 512 * n);
-            }
-        } else if (cluster_offset & QCOW_OFLAG_COMPRESSED) {
-            if (decompress_cluster(bs, cluster_offset) < 0)
-                return -1;
-            memcpy(buf, s->cluster_cache + index_in_cluster * 512, 512 * n);
-        } else {
-            ret = bdrv_pread(bs->file, cluster_offset + index_in_cluster * 512, buf, n * 512);
-            if (ret != n * 512)
-                return -1;
-            if (s->crypt_method) {
-                encrypt_sectors(s, sector_num, buf, buf, n, 0,
-                                &s->aes_decrypt_key);
-            }
-        }
-        nb_sectors -= n;
-        sector_num += n;
-        buf += n * 512;
-    }
-    return 0;
-}
-#endif
-
-typedef struct QCowAIOCB {
-    BlockDriverAIOCB common;
-    int64_t sector_num;
-    QEMUIOVector *qiov;
-    uint8_t *buf;
-    void *orig_buf;
-    int nb_sectors;
-    int n;
-    uint64_t cluster_offset;
-    uint8_t *cluster_data;
-    struct iovec hd_iov;
-    bool is_write;
-    QEMUBH *bh;
-    QEMUIOVector hd_qiov;
-    BlockDriverAIOCB *hd_aiocb;
-} QCowAIOCB;
-
-static void qcow_aio_cancel(BlockDriverAIOCB *blockacb)
-{
-    QCowAIOCB *acb = container_of(blockacb, QCowAIOCB, common);
-    if (acb->hd_aiocb)
-        bdrv_aio_cancel(acb->hd_aiocb);
-    qemu_aio_release(acb);
-}
-
-static AIOPool qcow_aio_pool = {
-    .aiocb_size         = sizeof(QCowAIOCB),
-    .cancel             = qcow_aio_cancel,
-};
-
-static QCowAIOCB *qcow_aio_setup(BlockDriverState *bs,
-        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
-        int is_write)
-{
-    QCowAIOCB *acb;
-
-    acb = qemu_aio_get(&qcow_aio_pool, bs, NULL, NULL);
-    if (!acb)
-        return NULL;
-    acb->hd_aiocb = NULL;
-    acb->sector_num = sector_num;
-    acb->qiov = qiov;
-    acb->is_write = is_write;
-
-    if (qiov->niov > 1) {
-        acb->buf = acb->orig_buf = qemu_blockalign(bs, qiov->size);
-        if (is_write)
-            qemu_iovec_to_buffer(qiov, acb->buf);
-    } else {
-        acb->buf = (uint8_t *)qiov->iov->iov_base;
-    }
-    acb->nb_sectors = nb_sectors;
-    acb->n = 0;
-    acb->cluster_offset = 0;
-    return acb;
-}
-
-static int qcow_aio_read_cb(void *opaque)
-{
-    QCowAIOCB *acb = opaque;
-    BlockDriverState *bs = acb->common.bs;
-    BDRVQcowState *s = bs->opaque;
-    int index_in_cluster;
-    int ret;
-
-    acb->hd_aiocb = NULL;
-
- redo:
-    /* post process the read buffer */
-    if (!acb->cluster_offset) {
-        /* nothing to do */
-    } else if (acb->cluster_offset & QCOW_OFLAG_COMPRESSED) {
-        /* nothing to do */
-    } else {
-        if (s->crypt_method) {
-            encrypt_sectors(s, acb->sector_num, acb->buf, acb->buf,
-                            acb->n, 0,
-                            &s->aes_decrypt_key);
-        }
-    }
-
-    acb->nb_sectors -= acb->n;
-    acb->sector_num += acb->n;
-    acb->buf += acb->n * 512;
-
-    if (acb->nb_sectors == 0) {
-        /* request completed */
-        return 0;
-    }
-
-    /* prepare next AIO request */
-    acb->cluster_offset = get_cluster_offset(bs, acb->sector_num << 9,
-                                             0, 0, 0, 0);
-    index_in_cluster = acb->sector_num & (s->cluster_sectors - 1);
-    acb->n = s->cluster_sectors - index_in_cluster;
-    if (acb->n > acb->nb_sectors)
-        acb->n = acb->nb_sectors;
-
-    if (!acb->cluster_offset) {
-        if (bs->backing_hd) {
-            /* read from the base image */
-            acb->hd_iov.iov_base = (void *)acb->buf;
-            acb->hd_iov.iov_len = acb->n * 512;
-            qemu_iovec_init_external(&acb->hd_qiov, &acb->hd_iov, 1);
-            qemu_co_mutex_unlock(&s->lock);
-            ret = bdrv_co_readv(bs->backing_hd, acb->sector_num,
-                                acb->n, &acb->hd_qiov);
-            qemu_co_mutex_lock(&s->lock);
-            if (ret < 0) {
-                return -EIO;
-            }
-        } else {
-            /* Note: in this case, no need to wait */
-            memset(acb->buf, 0, 512 * acb->n);
-            goto redo;
-        }
-    } else if (acb->cluster_offset & QCOW_OFLAG_COMPRESSED) {
-        /* add AIO support for compressed blocks ? */
-        if (decompress_cluster(bs, acb->cluster_offset) < 0) {
-            return -EIO;
-        }
-        memcpy(acb->buf,
-               s->cluster_cache + index_in_cluster * 512, 512 * acb->n);
-        goto redo;
-    } else {
-        if ((acb->cluster_offset & 511) != 0) {
-            return -EIO;
-        }
-        acb->hd_iov.iov_base = (void *)acb->buf;
-        acb->hd_iov.iov_len = acb->n * 512;
-        qemu_iovec_init_external(&acb->hd_qiov, &acb->hd_iov, 1);
-        qemu_co_mutex_unlock(&s->lock);
-        ret = bdrv_co_readv(bs->file,
-                            (acb->cluster_offset >> 9) + index_in_cluster,
-                            acb->n, &acb->hd_qiov);
-        qemu_co_mutex_lock(&s->lock);
-        if (ret < 0) {
-            return ret;
-        }
-    }
-
-    return 1;
-}
-
 static int qcow_co_readv(BlockDriverState *bs, int64_t sector_num,
                          int nb_sectors, QEMUIOVector *qiov)
 {
     BDRVQcowState *s = bs->opaque;
-    QCowAIOCB *acb;
-    int ret;
+    int index_in_cluster;
+    int ret = 0, n;
+    uint64_t cluster_offset;
+    struct iovec hd_iov;
+    QEMUIOVector hd_qiov;
+    uint8_t *buf;
+    void *orig_buf;
 
-    acb = qcow_aio_setup(bs, sector_num, qiov, nb_sectors, 0);
+    if (qiov->niov > 1) {
+        buf = orig_buf = qemu_blockalign(bs, qiov->size);
+    } else {
+        orig_buf = NULL;
+        buf = (uint8_t *)qiov->iov->iov_base;
+    }
 
     qemu_co_mutex_lock(&s->lock);
-    do {
-        ret = qcow_aio_read_cb(acb);
-    } while (ret > 0);
+
+    while (nb_sectors != 0) {
+        /* prepare next request */
+        cluster_offset = get_cluster_offset(bs, sector_num << 9,
+                                                 0, 0, 0, 0);
+        index_in_cluster = sector_num & (s->cluster_sectors - 1);
+        n = s->cluster_sectors - index_in_cluster;
+        if (n > nb_sectors) {
+            n = nb_sectors;
+        }
+
+        if (!cluster_offset) {
+            if (bs->backing_hd) {
+                /* read from the base image */
+                hd_iov.iov_base = (void *)buf;
+                hd_iov.iov_len = n * 512;
+                qemu_iovec_init_external(&hd_qiov, &hd_iov, 1);
+                qemu_co_mutex_unlock(&s->lock);
+                ret = bdrv_co_readv(bs->backing_hd, sector_num,
+                                    n, &hd_qiov);
+                qemu_co_mutex_lock(&s->lock);
+                if (ret < 0) {
+                    goto fail;
+                }
+            } else {
+                /* Note: in this case, no need to wait */
+                memset(buf, 0, 512 * n);
+            }
+        } else if (cluster_offset & QCOW_OFLAG_COMPRESSED) {
+            /* add AIO support for compressed blocks ? */
+            if (decompress_cluster(bs, cluster_offset) < 0) {
+                goto fail;
+            }
+            memcpy(buf,
+                   s->cluster_cache + index_in_cluster * 512, 512 * n);
+        } else {
+            if ((cluster_offset & 511) != 0) {
+                goto fail;
+            }
+            hd_iov.iov_base = (void *)buf;
+            hd_iov.iov_len = n * 512;
+            qemu_iovec_init_external(&hd_qiov, &hd_iov, 1);
+            qemu_co_mutex_unlock(&s->lock);
+            ret = bdrv_co_readv(bs->file,
+                                (cluster_offset >> 9) + index_in_cluster,
+                                n, &hd_qiov);
+            qemu_co_mutex_lock(&s->lock);
+            if (ret < 0) {
+                break;
+            }
+            if (s->crypt_method) {
+                encrypt_sectors(s, sector_num, buf, buf,
+                                n, 0,
+                                &s->aes_decrypt_key);
+            }
+        }
+        ret = 0;
+
+        nb_sectors -= n;
+        sector_num += n;
+        buf += n * 512;
+    }
+
+done:
     qemu_co_mutex_unlock(&s->lock);
 
-    if (acb->qiov->niov > 1) {
-        qemu_iovec_from_buffer(acb->qiov, acb->orig_buf, acb->qiov->size);
-        qemu_vfree(acb->orig_buf);
+    if (qiov->niov > 1) {
+        qemu_iovec_from_buffer(qiov, orig_buf, qiov->size);
+        qemu_vfree(orig_buf);
     }
-    qemu_aio_release(acb);
 
     return ret;
-}
 
-static int qcow_aio_write_cb(void *opaque)
-{
-    QCowAIOCB *acb = opaque;
-    BlockDriverState *bs = acb->common.bs;
-    BDRVQcowState *s = bs->opaque;
-    int index_in_cluster;
-    uint64_t cluster_offset;
-    const uint8_t *src_buf;
-    int ret;
-
-    acb->hd_aiocb = NULL;
-
-    acb->nb_sectors -= acb->n;
-    acb->sector_num += acb->n;
-    acb->buf += acb->n * 512;
-
-    if (acb->nb_sectors == 0) {
-        /* request completed */
-        return 0;
-    }
-
-    index_in_cluster = acb->sector_num & (s->cluster_sectors - 1);
-    acb->n = s->cluster_sectors - index_in_cluster;
-    if (acb->n > acb->nb_sectors)
-        acb->n = acb->nb_sectors;
-    cluster_offset = get_cluster_offset(bs, acb->sector_num << 9, 1, 0,
-                                        index_in_cluster,
-                                        index_in_cluster + acb->n);
-    if (!cluster_offset || (cluster_offset & 511) != 0) {
-        return -EIO;
-    }
-    if (s->crypt_method) {
-        if (!acb->cluster_data) {
-            acb->cluster_data = g_malloc0(s->cluster_size);
-        }
-        encrypt_sectors(s, acb->sector_num, acb->cluster_data, acb->buf,
-                        acb->n, 1, &s->aes_encrypt_key);
-        src_buf = acb->cluster_data;
-    } else {
-        src_buf = acb->buf;
-    }
-
-    acb->hd_iov.iov_base = (void *)src_buf;
-    acb->hd_iov.iov_len = acb->n * 512;
-    qemu_iovec_init_external(&acb->hd_qiov, &acb->hd_iov, 1);
-    qemu_co_mutex_unlock(&s->lock);
-    ret = bdrv_co_writev(bs->file,
-                         (cluster_offset >> 9) + index_in_cluster,
-                         acb->n, &acb->hd_qiov);
-    qemu_co_mutex_lock(&s->lock);
-    if (ret < 0) {
-        return ret;
-    }
-    return 1;
+fail:
+    ret = -EIO;
+    goto done;
 }
 
 static int qcow_co_writev(BlockDriverState *bs, int64_t sector_num,
                           int nb_sectors, QEMUIOVector *qiov)
 {
     BDRVQcowState *s = bs->opaque;
-    QCowAIOCB *acb;
-    int ret;
+    int index_in_cluster;
+    uint64_t cluster_offset;
+    const uint8_t *src_buf;
+    int ret = 0, n;
+    uint8_t *cluster_data = NULL;
+    struct iovec hd_iov;
+    QEMUIOVector hd_qiov;
+    uint8_t *buf;
+    void *orig_buf;
 
     s->cluster_cache_offset = -1; /* disable compressed cache */
 
-    acb = qcow_aio_setup(bs, sector_num, qiov, nb_sectors, 1);
+    if (qiov->niov > 1) {
+        buf = orig_buf = qemu_blockalign(bs, qiov->size);
+        qemu_iovec_to_buffer(qiov, buf);
+    } else {
+        orig_buf = NULL;
+        buf = (uint8_t *)qiov->iov->iov_base;
+    }
 
     qemu_co_mutex_lock(&s->lock);
-    do {
-        ret = qcow_aio_write_cb(acb);
-    } while (ret > 0);
+
+    while (nb_sectors != 0) {
+
+        index_in_cluster = sector_num & (s->cluster_sectors - 1);
+        n = s->cluster_sectors - index_in_cluster;
+        if (n > nb_sectors) {
+            n = nb_sectors;
+        }
+        cluster_offset = get_cluster_offset(bs, sector_num << 9, 1, 0,
+                                            index_in_cluster,
+                                            index_in_cluster + n);
+        if (!cluster_offset || (cluster_offset & 511) != 0) {
+            ret = -EIO;
+            break;
+        }
+        if (s->crypt_method) {
+            if (!cluster_data) {
+                cluster_data = g_malloc0(s->cluster_size);
+            }
+            encrypt_sectors(s, sector_num, cluster_data, buf,
+                            n, 1, &s->aes_encrypt_key);
+            src_buf = cluster_data;
+        } else {
+            src_buf = buf;
+        }
+
+        hd_iov.iov_base = (void *)src_buf;
+        hd_iov.iov_len = n * 512;
+        qemu_iovec_init_external(&hd_qiov, &hd_iov, 1);
+        qemu_co_mutex_unlock(&s->lock);
+        ret = bdrv_co_writev(bs->file,
+                             (cluster_offset >> 9) + index_in_cluster,
+                             n, &hd_qiov);
+        qemu_co_mutex_lock(&s->lock);
+        if (ret < 0) {
+            break;
+        }
+        ret = 0;
+
+        nb_sectors -= n;
+        sector_num += n;
+        buf += n * 512;
+    }
     qemu_co_mutex_unlock(&s->lock);
 
-    if (acb->qiov->niov > 1) {
-        qemu_vfree(acb->orig_buf);
+    if (qiov->niov > 1) {
+        qemu_vfree(orig_buf);
     }
-    qemu_aio_release(acb);
+    free(cluster_data);
 
     return ret;
 }
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index 9269dda..e06be64 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -53,7 +53,7 @@
     }
 
 #ifdef DEBUG_ALLOC2
-    printf("grow l1_table from %d to %d\n", s->l1_size, new_l1_size);
+    fprintf(stderr, "grow l1_table from %d to %d\n", s->l1_size, new_l1_size);
 #endif
 
     new_l1_size2 = sizeof(uint64_t) * new_l1_size;
@@ -381,10 +381,10 @@
  * For a given offset of the disk image, find the cluster offset in
  * qcow2 file. The offset is stored in *cluster_offset.
  *
- * on entry, *num is the number of contiguous clusters we'd like to
+ * on entry, *num is the number of contiguous sectors we'd like to
  * access following offset.
  *
- * on exit, *num is the number of contiguous clusters we can read.
+ * on exit, *num is the number of contiguous sectors we can read.
  *
  * Return 0, if the offset is found
  * Return -errno, otherwise.
diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
index 2a915be..9605367 100644
--- a/block/qcow2-refcount.c
+++ b/block/qcow2-refcount.c
@@ -422,7 +422,7 @@
     int ret;
 
 #ifdef DEBUG_ALLOC2
-    printf("update_refcount: offset=%" PRId64 " size=%" PRId64 " addend=%d\n",
+    fprintf(stderr, "update_refcount: offset=%" PRId64 " size=%" PRId64 " addend=%d\n",
            offset, length, addend);
 #endif
     if (length < 0) {
@@ -556,7 +556,7 @@
         }
     }
 #ifdef DEBUG_ALLOC2
-    printf("alloc_clusters: size=%" PRId64 " -> %" PRId64 "\n",
+    fprintf(stderr, "alloc_clusters: size=%" PRId64 " -> %" PRId64 "\n",
             size,
             (s->free_cluster_index - nb_clusters) << s->cluster_bits);
 #endif
@@ -680,24 +680,6 @@
 
 
 
-void qcow2_create_refcount_update(QCowCreateState *s, int64_t offset,
-    int64_t size)
-{
-    int refcount;
-    int64_t start, last, cluster_offset;
-    uint16_t *p;
-
-    start = offset & ~(s->cluster_size - 1);
-    last = (offset + size - 1)  & ~(s->cluster_size - 1);
-    for(cluster_offset = start; cluster_offset <= last;
-        cluster_offset += s->cluster_size) {
-        p = &s->refcount_block[cluster_offset >> s->cluster_bits];
-        refcount = be16_to_cpu(*p);
-        refcount++;
-        *p = cpu_to_be16(refcount);
-    }
-}
-
 /* update the refcounts of snapshots and the copied flag */
 int qcow2_update_snapshot_refcount(BlockDriverState *bs,
     int64_t l1_table_offset, int l1_size, int addend)
diff --git a/block/qcow2-snapshot.c b/block/qcow2-snapshot.c
index 3bd2a30..3e6bf8b 100644
--- a/block/qcow2-snapshot.c
+++ b/block/qcow2-snapshot.c
@@ -303,7 +303,10 @@
     if (qcow2_write_snapshots(bs) < 0)
         goto fail;
 #ifdef DEBUG_ALLOC
-    qcow2_check_refcounts(bs);
+    {
+      BdrvCheckResult result = {0};
+      qcow2_check_refcounts(bs, &result);
+    }
 #endif
     return 0;
  fail:
@@ -353,7 +356,10 @@
         goto fail;
 
 #ifdef DEBUG_ALLOC
-    qcow2_check_refcounts(bs);
+    {
+        BdrvCheckResult result = {0};
+        qcow2_check_refcounts(bs, &result);
+    }
 #endif
     return 0;
  fail:
@@ -390,7 +396,10 @@
         return ret;
     }
 #ifdef DEBUG_ALLOC
-    qcow2_check_refcounts(bs);
+    {
+        BdrvCheckResult result = {0};
+        qcow2_check_refcounts(bs, &result);
+    }
 #endif
     return 0;
 }
diff --git a/block/qcow2.c b/block/qcow2.c
index bfff6cd..b725d68 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -87,6 +87,7 @@
     while (offset < end_offset) {
 
 #ifdef DEBUG_EXT
+        BDRVQcowState *s = bs->opaque;
         /* Sanity check */
         if (offset > s->cluster_size)
             printf("qcow2_read_extension: suspicious offset %lu\n", offset);
@@ -280,7 +281,10 @@
     qemu_co_mutex_init(&s->lock);
 
 #ifdef DEBUG_ALLOC
-    qcow2_check_refcounts(bs);
+    {
+        BdrvCheckResult result = {0};
+        qcow2_check_refcounts(bs, &result);
+    }
 #endif
     return ret;
 
@@ -372,201 +376,127 @@
     return n1;
 }
 
-typedef struct QCowAIOCB {
-    BlockDriverAIOCB common;
-    int64_t sector_num;
-    QEMUIOVector *qiov;
-    int remaining_sectors;
-    int cur_nr_sectors;	/* number of sectors in current iteration */
-    uint64_t bytes_done;
-    uint64_t cluster_offset;
-    uint8_t *cluster_data;
-    bool is_write;
-    QEMUIOVector hd_qiov;
-    QEMUBH *bh;
-    QCowL2Meta l2meta;
-    QLIST_ENTRY(QCowAIOCB) next_depend;
-} QCowAIOCB;
-
-static void qcow2_aio_cancel(BlockDriverAIOCB *blockacb)
+static int qcow2_co_readv(BlockDriverState *bs, int64_t sector_num,
+                          int remaining_sectors, QEMUIOVector *qiov)
 {
-    QCowAIOCB *acb = container_of(blockacb, QCowAIOCB, common);
-    qemu_aio_release(acb);
-}
-
-static AIOPool qcow2_aio_pool = {
-    .aiocb_size         = sizeof(QCowAIOCB),
-    .cancel             = qcow2_aio_cancel,
-};
-
-/*
- * Returns 0 when the request is completed successfully, 1 when there is still
- * a part left to do and -errno in error cases.
- */
-static int qcow2_aio_read_cb(QCowAIOCB *acb)
-{
-    BlockDriverState *bs = acb->common.bs;
     BDRVQcowState *s = bs->opaque;
     int index_in_cluster, n1;
     int ret;
+    int cur_nr_sectors; /* number of sectors in current iteration */
+    uint64_t cluster_offset = 0;
+    uint64_t bytes_done = 0;
+    QEMUIOVector hd_qiov;
+    uint8_t *cluster_data = NULL;
 
-    /* post process the read buffer */
-    if (!acb->cluster_offset) {
-        /* nothing to do */
-    } else if (acb->cluster_offset & QCOW_OFLAG_COMPRESSED) {
-        /* nothing to do */
-    } else {
-        if (s->crypt_method) {
-            qcow2_encrypt_sectors(s, acb->sector_num,  acb->cluster_data,
-                acb->cluster_data, acb->cur_nr_sectors, 0, &s->aes_decrypt_key);
-            qemu_iovec_reset(&acb->hd_qiov);
-            qemu_iovec_copy(&acb->hd_qiov, acb->qiov, acb->bytes_done,
-                acb->cur_nr_sectors * 512);
-            qemu_iovec_from_buffer(&acb->hd_qiov, acb->cluster_data,
-                512 * acb->cur_nr_sectors);
-        }
-    }
-
-    acb->remaining_sectors -= acb->cur_nr_sectors;
-    acb->sector_num += acb->cur_nr_sectors;
-    acb->bytes_done += acb->cur_nr_sectors * 512;
-
-    if (acb->remaining_sectors == 0) {
-        /* request completed */
-        return 0;
-    }
-
-    /* prepare next AIO request */
-    acb->cur_nr_sectors = acb->remaining_sectors;
-    if (s->crypt_method) {
-        acb->cur_nr_sectors = MIN(acb->cur_nr_sectors,
-            QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors);
-    }
-
-    ret = qcow2_get_cluster_offset(bs, acb->sector_num << 9,
-        &acb->cur_nr_sectors, &acb->cluster_offset);
-    if (ret < 0) {
-        return ret;
-    }
-
-    index_in_cluster = acb->sector_num & (s->cluster_sectors - 1);
-
-    qemu_iovec_reset(&acb->hd_qiov);
-    qemu_iovec_copy(&acb->hd_qiov, acb->qiov, acb->bytes_done,
-        acb->cur_nr_sectors * 512);
-
-    if (!acb->cluster_offset) {
-
-        if (bs->backing_hd) {
-            /* read from the base image */
-            n1 = qcow2_backing_read1(bs->backing_hd, &acb->hd_qiov,
-                acb->sector_num, acb->cur_nr_sectors);
-            if (n1 > 0) {
-                BLKDBG_EVENT(bs->file, BLKDBG_READ_BACKING_AIO);
-                qemu_co_mutex_unlock(&s->lock);
-                ret = bdrv_co_readv(bs->backing_hd, acb->sector_num,
-                                    n1, &acb->hd_qiov);
-                qemu_co_mutex_lock(&s->lock);
-                if (ret < 0) {
-                    return ret;
-                }
-            }
-            return 1;
-        } else {
-            /* Note: in this case, no need to wait */
-            qemu_iovec_memset(&acb->hd_qiov, 0, 512 * acb->cur_nr_sectors);
-            return 1;
-        }
-    } else if (acb->cluster_offset & QCOW_OFLAG_COMPRESSED) {
-        /* add AIO support for compressed blocks ? */
-        ret = qcow2_decompress_cluster(bs, acb->cluster_offset);
-        if (ret < 0) {
-            return ret;
-        }
-
-        qemu_iovec_from_buffer(&acb->hd_qiov,
-            s->cluster_cache + index_in_cluster * 512,
-            512 * acb->cur_nr_sectors);
-
-        return 1;
-    } else {
-        if ((acb->cluster_offset & 511) != 0) {
-            return -EIO;
-        }
-
-        if (s->crypt_method) {
-            /*
-             * For encrypted images, read everything into a temporary
-             * contiguous buffer on which the AES functions can work.
-             */
-            if (!acb->cluster_data) {
-                acb->cluster_data =
-                    g_malloc0(QCOW_MAX_CRYPT_CLUSTERS * s->cluster_size);
-            }
-
-            assert(acb->cur_nr_sectors <=
-                QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors);
-            qemu_iovec_reset(&acb->hd_qiov);
-            qemu_iovec_add(&acb->hd_qiov, acb->cluster_data,
-                512 * acb->cur_nr_sectors);
-        }
-
-        BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
-        qemu_co_mutex_unlock(&s->lock);
-        ret = bdrv_co_readv(bs->file,
-                            (acb->cluster_offset >> 9) + index_in_cluster,
-                            acb->cur_nr_sectors, &acb->hd_qiov);
-        qemu_co_mutex_lock(&s->lock);
-        if (ret < 0) {
-            return ret;
-        }
-    }
-
-    return 1;
-}
-
-static QCowAIOCB *qcow2_aio_setup(BlockDriverState *bs, int64_t sector_num,
-                                  QEMUIOVector *qiov, int nb_sectors,
-                                  BlockDriverCompletionFunc *cb,
-                                  void *opaque, int is_write)
-{
-    QCowAIOCB *acb;
-
-    acb = qemu_aio_get(&qcow2_aio_pool, bs, cb, opaque);
-    if (!acb)
-        return NULL;
-    acb->sector_num = sector_num;
-    acb->qiov = qiov;
-    acb->is_write = is_write;
-
-    qemu_iovec_init(&acb->hd_qiov, qiov->niov);
-
-    acb->bytes_done = 0;
-    acb->remaining_sectors = nb_sectors;
-    acb->cur_nr_sectors = 0;
-    acb->cluster_offset = 0;
-    acb->l2meta.nb_clusters = 0;
-    qemu_co_queue_init(&acb->l2meta.dependent_requests);
-    return acb;
-}
-
-static int qcow2_co_readv(BlockDriverState *bs, int64_t sector_num,
-                          int nb_sectors, QEMUIOVector *qiov)
-{
-    BDRVQcowState *s = bs->opaque;
-    QCowAIOCB *acb;
-    int ret;
-
-    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, NULL, NULL, 0);
+    qemu_iovec_init(&hd_qiov, qiov->niov);
 
     qemu_co_mutex_lock(&s->lock);
-    do {
-        ret = qcow2_aio_read_cb(acb);
-    } while (ret > 0);
+
+    while (remaining_sectors != 0) {
+
+        /* prepare next request */
+        cur_nr_sectors = remaining_sectors;
+        if (s->crypt_method) {
+            cur_nr_sectors = MIN(cur_nr_sectors,
+                QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors);
+        }
+
+        ret = qcow2_get_cluster_offset(bs, sector_num << 9,
+            &cur_nr_sectors, &cluster_offset);
+        if (ret < 0) {
+            goto fail;
+        }
+
+        index_in_cluster = sector_num & (s->cluster_sectors - 1);
+
+        qemu_iovec_reset(&hd_qiov);
+        qemu_iovec_copy(&hd_qiov, qiov, bytes_done,
+            cur_nr_sectors * 512);
+
+        if (!cluster_offset) {
+
+            if (bs->backing_hd) {
+                /* read from the base image */
+                n1 = qcow2_backing_read1(bs->backing_hd, &hd_qiov,
+                    sector_num, cur_nr_sectors);
+                if (n1 > 0) {
+                    BLKDBG_EVENT(bs->file, BLKDBG_READ_BACKING_AIO);
+                    qemu_co_mutex_unlock(&s->lock);
+                    ret = bdrv_co_readv(bs->backing_hd, sector_num,
+                                        n1, &hd_qiov);
+                    qemu_co_mutex_lock(&s->lock);
+                    if (ret < 0) {
+                        goto fail;
+                    }
+                }
+            } else {
+                /* Note: in this case, no need to wait */
+                qemu_iovec_memset(&hd_qiov, 0, 512 * cur_nr_sectors);
+            }
+        } else if (cluster_offset & QCOW_OFLAG_COMPRESSED) {
+            /* add AIO support for compressed blocks ? */
+            ret = qcow2_decompress_cluster(bs, cluster_offset);
+            if (ret < 0) {
+                goto fail;
+            }
+
+            qemu_iovec_from_buffer(&hd_qiov,
+                s->cluster_cache + index_in_cluster * 512,
+                512 * cur_nr_sectors);
+        } else {
+            if ((cluster_offset & 511) != 0) {
+                ret = -EIO;
+                goto fail;
+            }
+
+            if (s->crypt_method) {
+                /*
+                 * For encrypted images, read everything into a temporary
+                 * contiguous buffer on which the AES functions can work.
+                 */
+                if (!cluster_data) {
+                    cluster_data =
+                        g_malloc0(QCOW_MAX_CRYPT_CLUSTERS * s->cluster_size);
+                }
+
+                assert(cur_nr_sectors <=
+                    QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors);
+                qemu_iovec_reset(&hd_qiov);
+                qemu_iovec_add(&hd_qiov, cluster_data,
+                    512 * cur_nr_sectors);
+            }
+
+            BLKDBG_EVENT(bs->file, BLKDBG_READ_AIO);
+            qemu_co_mutex_unlock(&s->lock);
+            ret = bdrv_co_readv(bs->file,
+                                (cluster_offset >> 9) + index_in_cluster,
+                                cur_nr_sectors, &hd_qiov);
+            qemu_co_mutex_lock(&s->lock);
+            if (ret < 0) {
+                goto fail;
+            }
+            if (s->crypt_method) {
+                qcow2_encrypt_sectors(s, sector_num,  cluster_data,
+                    cluster_data, cur_nr_sectors, 0, &s->aes_decrypt_key);
+                qemu_iovec_reset(&hd_qiov);
+                qemu_iovec_copy(&hd_qiov, qiov, bytes_done,
+                    cur_nr_sectors * 512);
+                qemu_iovec_from_buffer(&hd_qiov, cluster_data,
+                    512 * cur_nr_sectors);
+            }
+        }
+
+        remaining_sectors -= cur_nr_sectors;
+        sector_num += cur_nr_sectors;
+        bytes_done += cur_nr_sectors * 512;
+    }
+    ret = 0;
+
+fail:
     qemu_co_mutex_unlock(&s->lock);
 
-    qemu_iovec_destroy(&acb->hd_qiov);
-    qemu_aio_release(acb);
+    qemu_iovec_destroy(&hd_qiov);
+    g_free(cluster_data);
 
     return ret;
 }
@@ -586,104 +516,100 @@
     }
 }
 
-/*
- * Returns 0 when the request is completed successfully, 1 when there is still
- * a part left to do and -errno in error cases.
- */
-static int qcow2_aio_write_cb(QCowAIOCB *acb)
+static int qcow2_co_writev(BlockDriverState *bs,
+                           int64_t sector_num,
+                           int remaining_sectors,
+                           QEMUIOVector *qiov)
 {
-    BlockDriverState *bs = acb->common.bs;
     BDRVQcowState *s = bs->opaque;
     int index_in_cluster;
     int n_end;
     int ret;
+    int cur_nr_sectors; /* number of sectors in current iteration */
+    QCowL2Meta l2meta;
+    uint64_t cluster_offset;
+    QEMUIOVector hd_qiov;
+    uint64_t bytes_done = 0;
+    uint8_t *cluster_data = NULL;
 
-    ret = qcow2_alloc_cluster_link_l2(bs, &acb->l2meta);
+    l2meta.nb_clusters = 0;
+    qemu_co_queue_init(&l2meta.dependent_requests);
 
-    run_dependent_requests(s, &acb->l2meta);
+    qemu_iovec_init(&hd_qiov, qiov->niov);
 
-    if (ret < 0) {
-        return ret;
-    }
-
-    acb->remaining_sectors -= acb->cur_nr_sectors;
-    acb->sector_num += acb->cur_nr_sectors;
-    acb->bytes_done += acb->cur_nr_sectors * 512;
-
-    if (acb->remaining_sectors == 0) {
-        /* request completed */
-        return 0;
-    }
-
-    index_in_cluster = acb->sector_num & (s->cluster_sectors - 1);
-    n_end = index_in_cluster + acb->remaining_sectors;
-    if (s->crypt_method &&
-        n_end > QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors)
-        n_end = QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors;
-
-    ret = qcow2_alloc_cluster_offset(bs, acb->sector_num << 9,
-        index_in_cluster, n_end, &acb->cur_nr_sectors, &acb->l2meta);
-    if (ret < 0) {
-        return ret;
-    }
-
-    acb->cluster_offset = acb->l2meta.cluster_offset;
-    assert((acb->cluster_offset & 511) == 0);
-
-    qemu_iovec_reset(&acb->hd_qiov);
-    qemu_iovec_copy(&acb->hd_qiov, acb->qiov, acb->bytes_done,
-        acb->cur_nr_sectors * 512);
-
-    if (s->crypt_method) {
-        if (!acb->cluster_data) {
-            acb->cluster_data = g_malloc0(QCOW_MAX_CRYPT_CLUSTERS *
-                                             s->cluster_size);
-        }
-
-        assert(acb->hd_qiov.size <= QCOW_MAX_CRYPT_CLUSTERS * s->cluster_size);
-        qemu_iovec_to_buffer(&acb->hd_qiov, acb->cluster_data);
-
-        qcow2_encrypt_sectors(s, acb->sector_num, acb->cluster_data,
-            acb->cluster_data, acb->cur_nr_sectors, 1, &s->aes_encrypt_key);
-
-        qemu_iovec_reset(&acb->hd_qiov);
-        qemu_iovec_add(&acb->hd_qiov, acb->cluster_data,
-            acb->cur_nr_sectors * 512);
-    }
-
-    BLKDBG_EVENT(bs->file, BLKDBG_WRITE_AIO);
-    qemu_co_mutex_unlock(&s->lock);
-    ret = bdrv_co_writev(bs->file,
-                         (acb->cluster_offset >> 9) + index_in_cluster,
-                         acb->cur_nr_sectors, &acb->hd_qiov);
-    qemu_co_mutex_lock(&s->lock);
-    if (ret < 0) {
-        return ret;
-    }
-
-    return 1;
-}
-
-static int qcow2_co_writev(BlockDriverState *bs,
-                           int64_t sector_num,
-                           int nb_sectors,
-                           QEMUIOVector *qiov)
-{
-    BDRVQcowState *s = bs->opaque;
-    QCowAIOCB *acb;
-    int ret;
-
-    acb = qcow2_aio_setup(bs, sector_num, qiov, nb_sectors, NULL, NULL, 1);
     s->cluster_cache_offset = -1; /* disable compressed cache */
 
     qemu_co_mutex_lock(&s->lock);
-    do {
-        ret = qcow2_aio_write_cb(acb);
-    } while (ret > 0);
+
+    while (remaining_sectors != 0) {
+
+        index_in_cluster = sector_num & (s->cluster_sectors - 1);
+        n_end = index_in_cluster + remaining_sectors;
+        if (s->crypt_method &&
+            n_end > QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors) {
+            n_end = QCOW_MAX_CRYPT_CLUSTERS * s->cluster_sectors;
+        }
+
+        ret = qcow2_alloc_cluster_offset(bs, sector_num << 9,
+            index_in_cluster, n_end, &cur_nr_sectors, &l2meta);
+        if (ret < 0) {
+            goto fail;
+        }
+
+        cluster_offset = l2meta.cluster_offset;
+        assert((cluster_offset & 511) == 0);
+
+        qemu_iovec_reset(&hd_qiov);
+        qemu_iovec_copy(&hd_qiov, qiov, bytes_done,
+            cur_nr_sectors * 512);
+
+        if (s->crypt_method) {
+            if (!cluster_data) {
+                cluster_data = g_malloc0(QCOW_MAX_CRYPT_CLUSTERS *
+                                                 s->cluster_size);
+            }
+
+            assert(hd_qiov.size <=
+                   QCOW_MAX_CRYPT_CLUSTERS * s->cluster_size);
+            qemu_iovec_to_buffer(&hd_qiov, cluster_data);
+
+            qcow2_encrypt_sectors(s, sector_num, cluster_data,
+                cluster_data, cur_nr_sectors, 1, &s->aes_encrypt_key);
+
+            qemu_iovec_reset(&hd_qiov);
+            qemu_iovec_add(&hd_qiov, cluster_data,
+                cur_nr_sectors * 512);
+        }
+
+        BLKDBG_EVENT(bs->file, BLKDBG_WRITE_AIO);
+        qemu_co_mutex_unlock(&s->lock);
+        ret = bdrv_co_writev(bs->file,
+                             (cluster_offset >> 9) + index_in_cluster,
+                             cur_nr_sectors, &hd_qiov);
+        qemu_co_mutex_lock(&s->lock);
+        if (ret < 0) {
+            goto fail;
+        }
+
+        ret = qcow2_alloc_cluster_link_l2(bs, &l2meta);
+
+        run_dependent_requests(s, &l2meta);
+
+        if (ret < 0) {
+            goto fail;
+        }
+
+        remaining_sectors -= cur_nr_sectors;
+        sector_num += cur_nr_sectors;
+        bytes_done += cur_nr_sectors * 512;
+    }
+    ret = 0;
+
+fail:
     qemu_co_mutex_unlock(&s->lock);
 
-    qemu_iovec_destroy(&acb->hd_qiov);
-    qemu_aio_release(acb);
+    qemu_iovec_destroy(&hd_qiov);
+    g_free(cluster_data);
 
     return ret;
 }
diff --git a/block/qcow2.h b/block/qcow2.h
index de23abe..c8ca3bc 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -189,8 +189,6 @@
 void qcow2_free_any_clusters(BlockDriverState *bs,
     uint64_t cluster_offset, int nb_clusters);
 
-void qcow2_create_refcount_update(QCowCreateState *s, int64_t offset,
-    int64_t size);
 int qcow2_update_snapshot_refcount(BlockDriverState *bs,
     int64_t l1_table_offset, int l1_size, int addend);
 
diff --git a/block/sheepdog.c b/block/sheepdog.c
index 57b6e1a..c1f6e07 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -274,7 +274,7 @@
     int ret;
     enum AIOCBState aiocb_type;
 
-    QEMUBH *bh;
+    Coroutine *coroutine;
     void (*aio_done_func)(SheepdogAIOCB *);
 
     int canceled;
@@ -295,6 +295,10 @@
     char *port;
     int fd;
 
+    CoMutex lock;
+    Coroutine *co_send;
+    Coroutine *co_recv;
+
     uint32_t aioreq_seq_num;
     QLIST_HEAD(outstanding_aio_head, AIOReq) outstanding_aio_head;
 } BDRVSheepdogState;
@@ -346,19 +350,16 @@
 /*
  * Sheepdog I/O handling:
  *
- * 1. In the sd_aio_readv/writev, read/write requests are added to the
- *    QEMU Bottom Halves.
+ * 1. In sd_co_rw_vector, we send the I/O requests to the server and
+ *    link the requests to the outstanding_list in the
+ *    BDRVSheepdogState.  The function exits without waiting for
+ *    receiving the response.
  *
- * 2. In sd_readv_writev_bh_cb, the callbacks of BHs, we send the I/O
- *    requests to the server and link the requests to the
- *    outstanding_list in the BDRVSheepdogState.  we exits the
- *    function without waiting for receiving the response.
- *
- * 3. We receive the response in aio_read_response, the fd handler to
+ * 2. We receive the response in aio_read_response, the fd handler to
  *    the sheepdog connection.  If metadata update is needed, we send
  *    the write request to the vdi object in sd_write_done, the write
- *    completion function.  The AIOCB callback is not called until all
- *    the requests belonging to the AIOCB are finished.
+ *    completion function.  We switch back to sd_co_readv/writev after
+ *    all the requests belonging to the AIOCB are finished.
  */
 
 static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
@@ -398,7 +399,7 @@
 static void sd_finish_aiocb(SheepdogAIOCB *acb)
 {
     if (!acb->canceled) {
-        acb->common.cb(acb->common.opaque, acb->ret);
+        qemu_coroutine_enter(acb->coroutine, NULL);
     }
     qemu_aio_release(acb);
 }
@@ -411,7 +412,8 @@
      * Sheepdog cannot cancel the requests which are already sent to
      * the servers, so we just complete the request with -EIO here.
      */
-    acb->common.cb(acb->common.opaque, -EIO);
+    acb->ret = -EIO;
+    qemu_coroutine_enter(acb->coroutine, NULL);
     acb->canceled = 1;
 }
 
@@ -435,24 +437,12 @@
 
     acb->aio_done_func = NULL;
     acb->canceled = 0;
-    acb->bh = NULL;
+    acb->coroutine = qemu_coroutine_self();
     acb->ret = 0;
     QLIST_INIT(&acb->aioreq_head);
     return acb;
 }
 
-static int sd_schedule_bh(QEMUBHFunc *cb, SheepdogAIOCB *acb)
-{
-    if (acb->bh) {
-        error_report("bug: %d %d", acb->aiocb_type, acb->aiocb_type);
-        return -EIO;
-    }
-
-    acb->bh = qemu_bh_new(cb, acb);
-    qemu_bh_schedule(acb->bh);
-    return 0;
-}
-
 #ifdef _WIN32
 
 struct msghdr {
@@ -635,7 +625,13 @@
 again:
     ret = do_send_recv(sockfd, iov, len, iov_offset, write);
     if (ret < 0) {
-        if (errno == EINTR || errno == EAGAIN) {
+        if (errno == EINTR) {
+            goto again;
+        }
+        if (errno == EAGAIN) {
+            if (qemu_in_coroutine()) {
+                qemu_coroutine_yield();
+            }
             goto again;
         }
         error_report("failed to recv a rsp, %s", strerror(errno));
@@ -793,14 +789,14 @@
     unsigned long idx;
 
     if (QLIST_EMPTY(&s->outstanding_aio_head)) {
-        return;
+        goto out;
     }
 
     /* read a header */
     ret = do_read(fd, &rsp, sizeof(rsp));
     if (ret) {
         error_report("failed to get the header, %s", strerror(errno));
-        return;
+        goto out;
     }
 
     /* find the right aio_req from the outstanding_aio list */
@@ -811,7 +807,7 @@
     }
     if (!aio_req) {
         error_report("cannot find aio_req %x", rsp.id);
-        return;
+        goto out;
     }
 
     acb = aio_req->aiocb;
@@ -847,7 +843,7 @@
                        aio_req->iov_offset);
         if (ret) {
             error_report("failed to get the data, %s", strerror(errno));
-            return;
+            goto out;
         }
         break;
     }
@@ -861,10 +857,30 @@
     if (!rest) {
         /*
          * We've finished all requests which belong to the AIOCB, so
-         * we can call the callback now.
+         * we can switch back to sd_co_readv/writev now.
          */
         acb->aio_done_func(acb);
     }
+out:
+    s->co_recv = NULL;
+}
+
+static void co_read_response(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+
+    if (!s->co_recv) {
+        s->co_recv = qemu_coroutine_create(aio_read_response);
+    }
+
+    qemu_coroutine_enter(s->co_recv, opaque);
+}
+
+static void co_write_request(void *opaque)
+{
+    BDRVSheepdogState *s = opaque;
+
+    qemu_coroutine_enter(s->co_send, NULL);
 }
 
 static int aio_flush_request(void *opaque)
@@ -924,7 +940,7 @@
         return -1;
     }
 
-    qemu_aio_set_fd_handler(fd, aio_read_response, NULL, aio_flush_request,
+    qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request,
                             NULL, s);
     return fd;
 }
@@ -1091,6 +1107,10 @@
 
     hdr.id = aio_req->id;
 
+    qemu_co_mutex_lock(&s->lock);
+    s->co_send = qemu_coroutine_self();
+    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
+                            aio_flush_request, NULL, s);
     set_cork(s->fd, 1);
 
     /* send a header */
@@ -1109,6 +1129,9 @@
     }
 
     set_cork(s->fd, 0);
+    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
+                            aio_flush_request, NULL, s);
+    qemu_co_mutex_unlock(&s->lock);
 
     return 0;
 }
@@ -1225,6 +1248,7 @@
 
     bs->total_sectors = s->inode.vdi_size / SECTOR_SIZE;
     strncpy(s->name, vdi, sizeof(s->name));
+    qemu_co_mutex_init(&s->lock);
     g_free(buf);
     return 0;
 out:
@@ -1491,7 +1515,7 @@
 /*
  * This function is called after writing data objects.  If we need to
  * update metadata, this sends a write request to the vdi object.
- * Otherwise, this calls the AIOCB callback.
+ * Otherwise, this switches back to sd_co_readv/writev.
  */
 static void sd_write_done(SheepdogAIOCB *acb)
 {
@@ -1587,8 +1611,11 @@
  * waiting the response.  The responses are received in the
  * `aio_read_response' function which is called from the main loop as
  * a fd handler.
+ *
+ * Returns 1 when we need to wait a response, 0 when there is no sent
+ * request and -errno in error cases.
  */
-static void sd_readv_writev_bh_cb(void *p)
+static int sd_co_rw_vector(void *p)
 {
     SheepdogAIOCB *acb = p;
     int ret = 0;
@@ -1600,9 +1627,6 @@
     SheepdogInode *inode = &s->inode;
     AIOReq *aio_req;
 
-    qemu_bh_delete(acb->bh);
-    acb->bh = NULL;
-
     if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
         /*
          * In the case we open the snapshot VDI, Sheepdog creates the
@@ -1684,42 +1708,47 @@
     }
 out:
     if (QLIST_EMPTY(&acb->aioreq_head)) {
-        sd_finish_aiocb(acb);
+        return acb->ret;
     }
+    return 1;
 }
 
-static BlockDriverAIOCB *sd_aio_writev(BlockDriverState *bs, int64_t sector_num,
-                                       QEMUIOVector *qiov, int nb_sectors,
-                                       BlockDriverCompletionFunc *cb,
-                                       void *opaque)
+static int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
+                        int nb_sectors, QEMUIOVector *qiov)
 {
     SheepdogAIOCB *acb;
+    int ret;
 
     if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
         /* TODO: shouldn't block here */
         if (sd_truncate(bs, (sector_num + nb_sectors) * SECTOR_SIZE) < 0) {
-            return NULL;
+            return -EIO;
         }
         bs->total_sectors = sector_num + nb_sectors;
     }
 
-    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
     acb->aio_done_func = sd_write_done;
     acb->aiocb_type = AIOCB_WRITE_UDATA;
 
-    sd_schedule_bh(sd_readv_writev_bh_cb, acb);
-    return &acb->common;
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
 }
 
-static BlockDriverAIOCB *sd_aio_readv(BlockDriverState *bs, int64_t sector_num,
-                                      QEMUIOVector *qiov, int nb_sectors,
-                                      BlockDriverCompletionFunc *cb,
-                                      void *opaque)
+static int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
+                       int nb_sectors, QEMUIOVector *qiov)
 {
     SheepdogAIOCB *acb;
-    int i;
+    int i, ret;
 
-    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, cb, opaque);
+    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors, NULL, NULL);
     acb->aiocb_type = AIOCB_READ_UDATA;
     acb->aio_done_func = sd_finish_aiocb;
 
@@ -1731,8 +1760,15 @@
         memset(qiov->iov[i].iov_base, 0, qiov->iov[i].iov_len);
     }
 
-    sd_schedule_bh(sd_readv_writev_bh_cb, acb);
-    return &acb->common;
+    ret = sd_co_rw_vector(acb);
+    if (ret <= 0) {
+        qemu_aio_release(acb);
+        return ret;
+    }
+
+    qemu_coroutine_yield();
+
+    return acb->ret;
 }
 
 static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
@@ -2062,8 +2098,8 @@
     .bdrv_getlength = sd_getlength,
     .bdrv_truncate  = sd_truncate,
 
-    .bdrv_aio_readv     = sd_aio_readv,
-    .bdrv_aio_writev    = sd_aio_writev,
+    .bdrv_co_readv  = sd_co_readv,
+    .bdrv_co_writev = sd_co_writev,
 
     .bdrv_snapshot_create   = sd_snapshot_create,
     .bdrv_snapshot_goto     = sd_snapshot_goto,
diff --git a/block_int.h b/block_int.h
index f6d02b3..8a72b80 100644
--- a/block_int.h
+++ b/block_int.h
@@ -28,6 +28,7 @@
 #include "qemu-option.h"
 #include "qemu-queue.h"
 #include "qemu-coroutine.h"
+#include "qemu-timer.h"
 
 #define BLOCK_FLAG_ENCRYPT	1
 #define BLOCK_FLAG_COMPAT6	4
@@ -184,10 +185,9 @@
     void *sync_aiocb;
 
     /* I/O stats (display with "info blockstats"). */
-    uint64_t rd_bytes;
-    uint64_t wr_bytes;
-    uint64_t rd_ops;
-    uint64_t wr_ops;
+    uint64_t nr_bytes[BDRV_MAX_IOTYPE];
+    uint64_t nr_ops[BDRV_MAX_IOTYPE];
+    uint64_t total_time_ns[BDRV_MAX_IOTYPE];
     uint64_t wr_highest_sector;
 
     /* Whether the disk can expand beyond total_sectors */
diff --git a/blockdev.c b/blockdev.c
index d272659..2602591 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -321,18 +321,9 @@
     }
 
     if ((buf = qemu_opt_get(opts, "cache")) != NULL) {
-        if (!strcmp(buf, "off") || !strcmp(buf, "none")) {
-            bdrv_flags |= BDRV_O_NOCACHE | BDRV_O_CACHE_WB;
-        } else if (!strcmp(buf, "writeback")) {
-            bdrv_flags |= BDRV_O_CACHE_WB;
-        } else if (!strcmp(buf, "unsafe")) {
-            bdrv_flags |= BDRV_O_CACHE_WB;
-            bdrv_flags |= BDRV_O_NO_FLUSH;
-        } else if (!strcmp(buf, "writethrough")) {
-            /* this is the default */
-        } else {
-           error_report("invalid cache option");
-           return NULL;
+        if (bdrv_parse_cache_flags(buf, &bdrv_flags) != 0) {
+            error_report("invalid cache option");
+            return NULL;
         }
     }
 
diff --git a/hw/ide/ahci.c b/hw/ide/ahci.c
index 29521ba..f4fa154 100644
--- a/hw/ide/ahci.c
+++ b/hw/ide/ahci.c
@@ -710,6 +710,7 @@
     DPRINTF(ncq_tfs->drive->port_no, "NCQ transfer tag %d finished\n",
             ncq_tfs->tag);
 
+    bdrv_acct_done(ncq_tfs->drive->port.ifs[0].bs, &ncq_tfs->acct);
     qemu_sglist_destroy(&ncq_tfs->sglist);
     ncq_tfs->used = 0;
 }
@@ -756,6 +757,10 @@
             ncq_tfs->is_read = 1;
 
             DPRINTF(port, "tag %d aio read %ld\n", ncq_tfs->tag, ncq_tfs->lba);
+
+            bdrv_acct_start(ncq_tfs->drive->port.ifs[0].bs, &ncq_tfs->acct,
+                            (ncq_tfs->sector_count-1) * BDRV_SECTOR_SIZE,
+                            BDRV_ACCT_READ);
             ncq_tfs->aiocb = dma_bdrv_read(ncq_tfs->drive->port.ifs[0].bs,
                                            &ncq_tfs->sglist, ncq_tfs->lba,
                                            ncq_cb, ncq_tfs);
@@ -766,6 +771,10 @@
             ncq_tfs->is_read = 0;
 
             DPRINTF(port, "tag %d aio write %ld\n", ncq_tfs->tag, ncq_tfs->lba);
+
+            bdrv_acct_start(ncq_tfs->drive->port.ifs[0].bs, &ncq_tfs->acct,
+                            (ncq_tfs->sector_count-1) * BDRV_SECTOR_SIZE,
+                            BDRV_ACCT_WRITE);
             ncq_tfs->aiocb = dma_bdrv_write(ncq_tfs->drive->port.ifs[0].bs,
                                             &ncq_tfs->sglist, ncq_tfs->lba,
                                             ncq_cb, ncq_tfs);
diff --git a/hw/ide/ahci.h b/hw/ide/ahci.h
index e456193..832539c 100644
--- a/hw/ide/ahci.h
+++ b/hw/ide/ahci.h
@@ -258,6 +258,7 @@
     AHCIDevice *drive;
     BlockDriverAIOCB *aiocb;
     QEMUSGList sglist;
+    BlockAcctCookie acct;
     int is_read;
     uint16_t sector_count;
     uint64_t lba;
diff --git a/hw/ide/atapi.c b/hw/ide/atapi.c
index fe2fb0b..c552320 100644
--- a/hw/ide/atapi.c
+++ b/hw/ide/atapi.c
@@ -104,17 +104,20 @@
     memset(buf, 0, 288);
 }
 
-static int cd_read_sector(BlockDriverState *bs, int lba, uint8_t *buf,
-                           int sector_size)
+static int cd_read_sector(IDEState *s, int lba, uint8_t *buf, int sector_size)
 {
     int ret;
 
     switch(sector_size) {
     case 2048:
-        ret = bdrv_read(bs, (int64_t)lba << 2, buf, 4);
+        bdrv_acct_start(s->bs, &s->acct, 4 * BDRV_SECTOR_SIZE, BDRV_ACCT_READ);
+        ret = bdrv_read(s->bs, (int64_t)lba << 2, buf, 4);
+        bdrv_acct_done(s->bs, &s->acct);
         break;
     case 2352:
-        ret = bdrv_read(bs, (int64_t)lba << 2, buf + 16, 4);
+        bdrv_acct_start(s->bs, &s->acct, 4 * BDRV_SECTOR_SIZE, BDRV_ACCT_READ);
+        ret = bdrv_read(s->bs, (int64_t)lba << 2, buf + 16, 4);
+        bdrv_acct_done(s->bs, &s->acct);
         if (ret < 0)
             return ret;
         cd_data_to_raw(buf, lba);
@@ -181,7 +184,7 @@
     } else {
         /* see if a new sector must be read */
         if (s->lba != -1 && s->io_buffer_index >= s->cd_sector_size) {
-            ret = cd_read_sector(s->bs, s->lba, s->io_buffer, s->cd_sector_size);
+            ret = cd_read_sector(s, s->lba, s->io_buffer, s->cd_sector_size);
             if (ret < 0) {
                 ide_transfer_stop(s);
                 ide_atapi_io_error(s, ret);
@@ -250,6 +253,7 @@
     s->io_buffer_index = 0;
 
     if (s->atapi_dma) {
+        bdrv_acct_start(s->bs, &s->acct, size, BDRV_ACCT_READ);
         s->status = READY_STAT | SEEK_STAT | DRQ_STAT;
         s->bus->dma->ops->start_dma(s->bus->dma, s,
                                    ide_atapi_cmd_read_dma_cb);
@@ -322,10 +326,7 @@
         s->status = READY_STAT | SEEK_STAT;
         s->nsector = (s->nsector & ~7) | ATAPI_INT_REASON_IO | ATAPI_INT_REASON_CD;
         ide_set_irq(s->bus);
-    eot:
-        s->bus->dma->ops->add_status(s->bus->dma, BM_STATUS_INT);
-        ide_set_inactive(s);
-        return;
+        goto eot;
     }
 
     s->io_buffer_index = 0;
@@ -343,9 +344,11 @@
 #ifdef DEBUG_AIO
     printf("aio_read_cd: lba=%u n=%d\n", s->lba, n);
 #endif
+
     s->bus->dma->iov.iov_base = (void *)(s->io_buffer + data_offset);
     s->bus->dma->iov.iov_len = n * 4 * 512;
     qemu_iovec_init_external(&s->bus->dma->qiov, &s->bus->dma->iov, 1);
+
     s->bus->dma->aiocb = bdrv_aio_readv(s->bs, (int64_t)s->lba << 2,
                                        &s->bus->dma->qiov, n * 4,
                                        ide_atapi_cmd_read_dma_cb, s);
@@ -355,6 +358,12 @@
                             ASC_MEDIUM_NOT_PRESENT);
         goto eot;
     }
+
+    return;
+eot:
+    bdrv_acct_done(s->bs, &s->acct);
+    s->bus->dma->ops->add_status(s->bus->dma, BM_STATUS_INT);
+    ide_set_inactive(s);
 }
 
 /* start a CD-CDROM read command with DMA */
@@ -368,6 +377,8 @@
     s->io_buffer_size = 0;
     s->cd_sector_size = sector_size;
 
+    bdrv_acct_start(s->bs, &s->acct, s->packet_transfer_size, BDRV_ACCT_READ);
+
     /* XXX: check if BUSY_STAT should be set */
     s->status = READY_STAT | SEEK_STAT | DRQ_STAT | BUSY_STAT;
     s->bus->dma->ops->start_dma(s->bus->dma, s,
diff --git a/hw/ide/core.c b/hw/ide/core.c
index d145b19..40abc1e 100644
--- a/hw/ide/core.c
+++ b/hw/ide/core.c
@@ -473,7 +473,10 @@
 #endif
         if (n > s->req_nb_sectors)
             n = s->req_nb_sectors;
+
+        bdrv_acct_start(s->bs, &s->acct, n * BDRV_SECTOR_SIZE, BDRV_ACCT_READ);
         ret = bdrv_read(s->bs, sector_num, s->io_buffer, n);
+        bdrv_acct_done(s->bs, &s->acct);
         if (ret != 0) {
             if (ide_handle_rw_error(s, -ret,
                 BM_STATUS_PIO_RETRY | BM_STATUS_RETRY_READ))
@@ -610,7 +613,10 @@
     return;
 
 eot:
-   ide_set_inactive(s);
+    if (s->dma_cmd == IDE_DMA_READ || s->dma_cmd == IDE_DMA_WRITE) {
+        bdrv_acct_done(s->bs, &s->acct);
+    }
+    ide_set_inactive(s);
 }
 
 static void ide_sector_start_dma(IDEState *s, enum ide_dma_cmd dma_cmd)
@@ -619,6 +625,20 @@
     s->io_buffer_index = 0;
     s->io_buffer_size = 0;
     s->dma_cmd = dma_cmd;
+
+    switch (dma_cmd) {
+    case IDE_DMA_READ:
+        bdrv_acct_start(s->bs, &s->acct, s->nsector * BDRV_SECTOR_SIZE,
+                        BDRV_ACCT_READ);
+        break;
+    case IDE_DMA_WRITE:
+        bdrv_acct_start(s->bs, &s->acct, s->nsector * BDRV_SECTOR_SIZE,
+                        BDRV_ACCT_WRITE);
+        break;
+    default:
+        break;
+    }
+
     s->bus->dma->ops->start_dma(s->bus->dma, s, ide_dma_cb);
 }
 
@@ -641,7 +661,10 @@
     n = s->nsector;
     if (n > s->req_nb_sectors)
         n = s->req_nb_sectors;
+
+    bdrv_acct_start(s->bs, &s->acct, n * BDRV_SECTOR_SIZE, BDRV_ACCT_READ);
     ret = bdrv_write(s->bs, sector_num, s->io_buffer, n);
+    bdrv_acct_done(s->bs, &s->acct);
 
     if (ret != 0) {
         if (ide_handle_rw_error(s, -ret, BM_STATUS_PIO_RETRY))
@@ -685,6 +708,7 @@
         }
     }
 
+    bdrv_acct_done(s->bs, &s->acct);
     s->status = READY_STAT | SEEK_STAT;
     ide_set_irq(s->bus);
 }
@@ -698,6 +722,7 @@
         return;
     }
 
+    bdrv_acct_start(s->bs, &s->acct, 0, BDRV_ACCT_FLUSH);
     acb = bdrv_aio_flush(s->bs, ide_flush_cb, s);
     if (acb == NULL) {
         ide_flush_cb(s, -EIO);
diff --git a/hw/ide/internal.h b/hw/ide/internal.h
index 02e805f..7f5ef8d 100644
--- a/hw/ide/internal.h
+++ b/hw/ide/internal.h
@@ -440,6 +440,7 @@
     int lba;
     int cd_sector_size;
     int atapi_dma; /* true if dma is requested for the packet cmd */
+    BlockAcctCookie acct;
     /* ATA DMA state */
     int io_buffer_size;
     QEMUSGList sg;
diff --git a/hw/ide/macio.c b/hw/ide/macio.c
index 44fb3fe..fdf5d75 100644
--- a/hw/ide/macio.c
+++ b/hw/ide/macio.c
@@ -52,8 +52,7 @@
         m->aiocb = NULL;
         qemu_sglist_destroy(&s->sg);
         ide_atapi_io_error(s, ret);
-        io->dma_end(opaque);
-        return;
+        goto done;
     }
 
     if (s->io_buffer_size > 0) {
@@ -71,8 +70,7 @@
         ide_atapi_cmd_ok(s);
 
     if (io->len == 0) {
-        io->dma_end(opaque);
-        return;
+        goto done;
     }
 
     /* launch next transfer */
@@ -92,9 +90,14 @@
         /* Note: media not present is the most likely case */
         ide_atapi_cmd_error(s, SENSE_NOT_READY,
                             ASC_MEDIUM_NOT_PRESENT);
-        io->dma_end(opaque);
-        return;
+        goto done;
     }
+    return;
+
+done:
+    bdrv_acct_done(s->bs, &s->acct);
+    io->dma_end(opaque);
+    return;
 }
 
 static void pmac_ide_transfer_cb(void *opaque, int ret)
@@ -109,8 +112,7 @@
         m->aiocb = NULL;
         qemu_sglist_destroy(&s->sg);
 	ide_dma_error(s);
-        io->dma_end(io);
-        return;
+        goto done;
     }
 
     sector_num = ide_get_sector(s);
@@ -130,10 +132,8 @@
     }
 
     /* end of DMA ? */
-
     if (io->len == 0) {
-        io->dma_end(io);
-	return;
+        goto done;
     }
 
     /* launch next transfer */
@@ -163,6 +163,12 @@
 
     if (!m->aiocb)
         pmac_ide_transfer_cb(io, -1);
+    return;
+done:
+    if (s->dma_cmd == IDE_DMA_READ || s->dma_cmd == IDE_DMA_WRITE) {
+        bdrv_acct_done(s->bs, &s->acct);
+    }
+    io->dma_end(io);
 }
 
 static void pmac_ide_transfer(DBDMA_io *io)
@@ -172,10 +178,22 @@
 
     s->io_buffer_size = 0;
     if (s->drive_kind == IDE_CD) {
+        bdrv_acct_start(s->bs, &s->acct, io->len, BDRV_ACCT_READ);
         pmac_ide_atapi_transfer_cb(io, 0);
         return;
     }
 
+    switch (s->dma_cmd) {
+    case IDE_DMA_READ:
+        bdrv_acct_start(s->bs, &s->acct, io->len, BDRV_ACCT_READ);
+        break;
+    case IDE_DMA_WRITE:
+        bdrv_acct_start(s->bs, &s->acct, io->len, BDRV_ACCT_WRITE);
+        break;
+    default:
+        break;
+    }
+
     pmac_ide_transfer_cb(io, 0);
 }
 
diff --git a/hw/scsi-disk.c b/hw/scsi-disk.c
index d94b1eb..3cc830f 100644
--- a/hw/scsi-disk.c
+++ b/hw/scsi-disk.c
@@ -57,6 +57,7 @@
     struct iovec iov;
     QEMUIOVector qiov;
     uint32_t status;
+    BlockAcctCookie acct;
 } SCSIDiskReq;
 
 struct SCSIDiskState
@@ -107,10 +108,13 @@
 static void scsi_read_complete(void * opaque, int ret)
 {
     SCSIDiskReq *r = (SCSIDiskReq *)opaque;
+    SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
     int n;
 
     r->req.aiocb = NULL;
 
+    bdrv_acct_done(s->bs, &r->acct);
+
     if (ret) {
         if (scsi_handle_rw_error(r, -ret, SCSI_REQ_STATUS_RETRY_READ)) {
             return;
@@ -161,6 +165,8 @@
 
     r->iov.iov_len = n * 512;
     qemu_iovec_init_external(&r->qiov, &r->iov, 1);
+
+    bdrv_acct_start(s->bs, &r->acct, n * BDRV_SECTOR_SIZE, BDRV_ACCT_READ);
     r->req.aiocb = bdrv_aio_readv(s->bs, r->sector, &r->qiov, n,
                               scsi_read_complete, r);
     if (r->req.aiocb == NULL) {
@@ -207,11 +213,14 @@
 static void scsi_write_complete(void * opaque, int ret)
 {
     SCSIDiskReq *r = (SCSIDiskReq *)opaque;
+    SCSIDiskState *s = DO_UPCAST(SCSIDiskState, qdev, r->req.dev);
     uint32_t len;
     uint32_t n;
 
     r->req.aiocb = NULL;
 
+    bdrv_acct_done(s->bs, &r->acct);
+
     if (ret) {
         if (scsi_handle_rw_error(r, -ret, SCSI_REQ_STATUS_RETRY_WRITE)) {
             return;
@@ -252,6 +261,8 @@
     n = r->iov.iov_len / 512;
     if (n) {
         qemu_iovec_init_external(&r->qiov, &r->iov, 1);
+
+        bdrv_acct_start(s->bs, &r->acct, n * BDRV_SECTOR_SIZE, BDRV_ACCT_WRITE);
         r->req.aiocb = bdrv_aio_writev(s->bs, r->sector, &r->qiov, n,
                                    scsi_write_complete, r);
         if (r->req.aiocb == NULL) {
@@ -854,13 +865,19 @@
         buflen = 8;
         break;
     case SYNCHRONIZE_CACHE:
+    {
+        BlockAcctCookie acct;
+
+        bdrv_acct_start(s->bs, &acct, 0, BDRV_ACCT_FLUSH);
         ret = bdrv_flush(s->bs);
+        bdrv_acct_done(s->bs, &acct);
         if (ret < 0) {
             if (scsi_handle_rw_error(r, -ret, SCSI_REQ_STATUS_RETRY_FLUSH)) {
                 return -1;
             }
         }
         break;
+    }
     case GET_CONFIGURATION:
         memset(outbuf, 0, 8);
         /* ??? This should probably return much more information.  For now
diff --git a/hw/virtio-blk.c b/hw/virtio-blk.c
index dad8c0a..2a8ccd0 100644
--- a/hw/virtio-blk.c
+++ b/hw/virtio-blk.c
@@ -47,6 +47,7 @@
     struct virtio_scsi_inhdr *scsi;
     QEMUIOVector qiov;
     struct VirtIOBlockReq *next;
+    BlockAcctCookie acct;
 } VirtIOBlockReq;
 
 static void virtio_blk_req_complete(VirtIOBlockReq *req, int status)
@@ -58,8 +59,6 @@
     stb_p(&req->in->status, status);
     virtqueue_push(s->vq, &req->elem, req->qiov.size + sizeof(*req->in));
     virtio_notify(&s->vdev, s->vq);
-
-    g_free(req);
 }
 
 static int virtio_blk_handle_rw_error(VirtIOBlockReq *req, int error,
@@ -81,6 +80,8 @@
         vm_stop(VMSTOP_DISKFULL);
     } else {
         virtio_blk_req_complete(req, VIRTIO_BLK_S_IOERR);
+        bdrv_acct_done(s->bs, &req->acct);
+        g_free(req);
         bdrv_mon_event(s->bs, BDRV_ACTION_REPORT, is_read);
     }
 
@@ -100,6 +101,8 @@
     }
 
     virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
+    bdrv_acct_done(req->dev->bs, &req->acct);
+    g_free(req);
 }
 
 static void virtio_blk_flush_complete(void *opaque, int ret)
@@ -113,6 +116,8 @@
     }
 
     virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
+    bdrv_acct_done(req->dev->bs, &req->acct);
+    g_free(req);
 }
 
 static VirtIOBlockReq *virtio_blk_alloc_request(VirtIOBlock *s)
@@ -155,6 +160,7 @@
      */
     if (req->elem.out_num < 2 || req->elem.in_num < 3) {
         virtio_blk_req_complete(req, VIRTIO_BLK_S_IOERR);
+        g_free(req);
         return;
     }
 
@@ -163,6 +169,7 @@
      */
     if (req->elem.out_num > 2 && req->elem.in_num > 3) {
         virtio_blk_req_complete(req, VIRTIO_BLK_S_UNSUPP);
+        g_free(req);
         return;
     }
 
@@ -229,11 +236,13 @@
     stl_p(&req->scsi->data_len, hdr.dxfer_len);
 
     virtio_blk_req_complete(req, status);
+    g_free(req);
 }
 #else
 static void virtio_blk_handle_scsi(VirtIOBlockReq *req)
 {
     virtio_blk_req_complete(req, VIRTIO_BLK_S_UNSUPP);
+    g_free(req);
 }
 #endif /* __linux__ */
 
@@ -266,6 +275,8 @@
 {
     BlockDriverAIOCB *acb;
 
+    bdrv_acct_start(req->dev->bs, &req->acct, 0, BDRV_ACCT_FLUSH);
+
     /*
      * Make sure all outstanding writes are posted to the backing device.
      */
@@ -284,6 +295,8 @@
 
     sector = ldq_p(&req->out->sector);
 
+    bdrv_acct_start(req->dev->bs, &req->acct, req->qiov.size, BDRV_ACCT_WRITE);
+
     trace_virtio_blk_handle_write(req, sector, req->qiov.size / 512);
 
     if (sector & req->dev->sector_mask) {
@@ -317,6 +330,8 @@
 
     sector = ldq_p(&req->out->sector);
 
+    bdrv_acct_start(req->dev->bs, &req->acct, req->qiov.size, BDRV_ACCT_READ);
+
     if (sector & req->dev->sector_mask) {
         virtio_blk_rw_complete(req, -EIO);
         return;
@@ -370,6 +385,7 @@
                 s->serial ? s->serial : "",
                 MIN(req->elem.in_sg[0].iov_len, VIRTIO_BLK_ID_BYTES));
         virtio_blk_req_complete(req, VIRTIO_BLK_S_OK);
+        g_free(req);
     } else if (type & VIRTIO_BLK_T_OUT) {
         qemu_iovec_init_external(&req->qiov, &req->elem.out_sg[1],
                                  req->elem.out_num - 1);
diff --git a/hw/xen_disk.c b/hw/xen_disk.c
index 31f9151..bd5c669 100644
--- a/hw/xen_disk.c
+++ b/hw/xen_disk.c
@@ -79,6 +79,7 @@
 
     struct XenBlkDev    *blkdev;
     QLIST_ENTRY(ioreq)   list;
+    BlockAcctCookie     acct;
 };
 
 struct XenBlkDev {
@@ -401,6 +402,7 @@
     ioreq->status = ioreq->aio_errors ? BLKIF_RSP_ERROR : BLKIF_RSP_OKAY;
     ioreq_unmap(ioreq);
     ioreq_finish(ioreq);
+    bdrv_acct_done(ioreq->blkdev->bs, &ioreq->acct);
     qemu_bh_schedule(ioreq->blkdev->bh);
 }
 
@@ -419,6 +421,7 @@
 
     switch (ioreq->req.operation) {
     case BLKIF_OP_READ:
+        bdrv_acct_start(blkdev->bs, &ioreq->acct, ioreq->v.size, BDRV_ACCT_READ);
         ioreq->aio_inflight++;
         bdrv_aio_readv(blkdev->bs, ioreq->start / BLOCK_SIZE,
                        &ioreq->v, ioreq->v.size / BLOCK_SIZE,
@@ -429,6 +432,8 @@
         if (!ioreq->req.nr_segments) {
             break;
         }
+
+        bdrv_acct_start(blkdev->bs, &ioreq->acct, ioreq->v.size, BDRV_ACCT_WRITE);
         ioreq->aio_inflight++;
         bdrv_aio_writev(blkdev->bs, ioreq->start / BLOCK_SIZE,
                         &ioreq->v, ioreq->v.size / BLOCK_SIZE,
diff --git a/posix-aio-compat.c b/posix-aio-compat.c
index babb094..3193dbf 100644
--- a/posix-aio-compat.c
+++ b/posix-aio-compat.c
@@ -30,6 +30,7 @@
 
 #include "block/raw-posix-aio.h"
 
+static void do_spawn_thread(void);
 
 struct qemu_paiocb {
     BlockDriverAIOCB common;
@@ -64,6 +65,9 @@
 static int max_threads = 64;
 static int cur_threads = 0;
 static int idle_threads = 0;
+static int new_threads = 0;     /* backlog of threads we need to create */
+static int pending_threads = 0; /* threads created but not running yet */
+static QEMUBH *new_thread_bh;
 static QTAILQ_HEAD(, qemu_paiocb) request_list;
 
 #ifdef CONFIG_PREADV
@@ -311,6 +315,11 @@
 
     pid = getpid();
 
+    mutex_lock(&lock);
+    pending_threads--;
+    mutex_unlock(&lock);
+    do_spawn_thread();
+
     while (1) {
         struct qemu_paiocb *aiocb;
         ssize_t ret = 0;
@@ -381,11 +390,20 @@
     return NULL;
 }
 
-static void spawn_thread(void)
+static void do_spawn_thread(void)
 {
     sigset_t set, oldset;
 
-    cur_threads++;
+    mutex_lock(&lock);
+    if (!new_threads) {
+        mutex_unlock(&lock);
+        return;
+    }
+
+    new_threads--;
+    pending_threads++;
+
+    mutex_unlock(&lock);
 
     /* block all signals */
     if (sigfillset(&set)) die("sigfillset");
@@ -396,6 +414,27 @@
     if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
 }
 
+static void spawn_thread_bh_fn(void *opaque)
+{
+    do_spawn_thread();
+}
+
+static void spawn_thread(void)
+{
+    cur_threads++;
+    new_threads++;
+    /* If there are threads being created, they will spawn new workers, so
+     * we don't spend time creating many threads in a loop holding a mutex or
+     * starving the current vcpu.
+     *
+     * If there are no idle threads, ask the main thread to create one, so we
+     * inherit the correct affinity instead of the vcpu affinity.
+     */
+    if (!pending_threads) {
+        qemu_bh_schedule(new_thread_bh);
+    }
+}
+
 static void qemu_paio_submit(struct qemu_paiocb *aiocb)
 {
     aiocb->ret = -EINPROGRESS;
@@ -665,6 +704,7 @@
         die2(ret, "pthread_attr_setdetachstate");
 
     QTAILQ_INIT(&request_list);
+    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
 
     posix_aio_state = s;
     return 0;
diff --git a/qemu-config.c b/qemu-config.c
index 1eb6b9a..139e077 100644
--- a/qemu-config.c
+++ b/qemu-config.c
@@ -55,7 +55,8 @@
         },{
             .name = "cache",
             .type = QEMU_OPT_STRING,
-            .help = "host cache usage (none, writeback, writethrough, unsafe)",
+            .help = "host cache usage (none, writeback, writethrough, "
+                    "directsync, unsafe)",
         },{
             .name = "aio",
             .type = QEMU_OPT_STRING,
diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c
index a80f437..2a385a3 100644
--- a/qemu-coroutine-lock.c
+++ b/qemu-coroutine-lock.c
@@ -115,3 +115,47 @@
 
     trace_qemu_co_mutex_unlock_return(mutex, self);
 }
+
+void qemu_co_rwlock_init(CoRwlock *lock)
+{
+    memset(lock, 0, sizeof(*lock));
+    qemu_co_queue_init(&lock->queue);
+}
+
+void qemu_co_rwlock_rdlock(CoRwlock *lock)
+{
+    while (lock->writer) {
+        qemu_co_queue_wait(&lock->queue);
+    }
+    lock->reader++;
+}
+
+void qemu_co_rwlock_unlock(CoRwlock *lock)
+{
+    assert(qemu_in_coroutine());
+    if (lock->writer) {
+        lock->writer = false;
+        while (!qemu_co_queue_empty(&lock->queue)) {
+            /*
+             * Wakeup every body. This will include some
+             * writers too.
+             */
+            qemu_co_queue_next(&lock->queue);
+        }
+    } else {
+        lock->reader--;
+        assert(lock->reader >= 0);
+        /* Wakeup only one waiting writer */
+        if (!lock->reader) {
+            qemu_co_queue_next(&lock->queue);
+        }
+    }
+}
+
+void qemu_co_rwlock_wrlock(CoRwlock *lock)
+{
+    while (lock->writer || lock->reader) {
+        qemu_co_queue_wait(&lock->queue);
+    }
+    lock->writer = true;
+}
diff --git a/qemu-coroutine.h b/qemu-coroutine.h
index 2f2fd95..b8fc4f4 100644
--- a/qemu-coroutine.h
+++ b/qemu-coroutine.h
@@ -156,4 +156,36 @@
  */
 void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex);
 
+typedef struct CoRwlock {
+    bool writer;
+    int reader;
+    CoQueue queue;
+} CoRwlock;
+
+/**
+ * Initialises a CoRwlock. This must be called before any other operation
+ * is used on the CoRwlock
+ */
+void qemu_co_rwlock_init(CoRwlock *lock);
+
+/**
+ * Read locks the CoRwlock. If the lock cannot be taken immediately because
+ * of a parallel writer, control is transferred to the caller of the current
+ * coroutine.
+ */
+void qemu_co_rwlock_rdlock(CoRwlock *lock);
+
+/**
+ * Write Locks the mutex. If the lock cannot be taken immediately because
+ * of a parallel reader, control is transferred to the caller of the current
+ * coroutine.
+ */
+void qemu_co_rwlock_wrlock(CoRwlock *lock);
+
+/**
+ * Unlocks the read/write lock and schedules the next coroutine that was
+ * waiting for this lock to be run.
+ */
+void qemu_co_rwlock_unlock(CoRwlock *lock);
+
 #endif /* QEMU_COROUTINE_H */
diff --git a/qemu-img-cmds.hx b/qemu-img-cmds.hx
index 1299e83..4be00a5 100644
--- a/qemu-img-cmds.hx
+++ b/qemu-img-cmds.hx
@@ -28,9 +28,9 @@
 ETEXI
 
 DEF("convert", img_convert,
-    "convert [-c] [-p] [-f fmt] [-t cache] [-O output_fmt] [-o options] [-s snapshot_name] filename [filename2 [...]] output_filename")
+    "convert [-c] [-p] [-f fmt] [-t cache] [-O output_fmt] [-o options] [-s snapshot_name] [-S sparse_size] filename [filename2 [...]] output_filename")
 STEXI
-@item convert [-c] [-p] [-f @var{fmt}] [-O @var{output_fmt}] [-o @var{options}] [-s @var{snapshot_name}] @var{filename} [@var{filename2} [...]] @var{output_filename}
+@item convert [-c] [-p] [-f @var{fmt}] [-O @var{output_fmt}] [-o @var{options}] [-s @var{snapshot_name}] [-S @var{sparse_size}] @var{filename} [@var{filename2} [...]] @var{output_filename}
 ETEXI
 
 DEF("info", img_info,
diff --git a/qemu-img.c b/qemu-img.c
index 95f3219..6a39731 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -66,7 +66,8 @@
            "  'filename' is a disk image filename\n"
            "  'fmt' is the disk image format. It is guessed automatically in most cases\n"
            "  'cache' is the cache mode used to write the output disk image, the valid\n"
-           "    options are: 'none', 'writeback' (default), 'writethrough' and 'unsafe'\n"
+           "    options are: 'none', 'writeback' (default), 'writethrough', 'directsync'\n"
+           "    and 'unsafe'\n"
            "  'size' is the disk image size in bytes. Optional suffixes\n"
            "    'k' or 'K' (kilobyte, 1024), 'M' (megabyte, 1024k), 'G' (gigabyte, 1024M)\n"
            "    and T (terabyte, 1024G) are supported. 'b' is ignored.\n"
@@ -81,6 +82,8 @@
            "       rebasing in this case (useful for renaming the backing file)\n"
            "  '-h' with or without a command shows this help and lists the supported formats\n"
            "  '-p' show progress of command (only certain commands)\n"
+           "  '-S' indicates the consecutive number of bytes that must contain only zeros\n"
+           "       for qemu-img to create a sparse image during conversion\n"
            "\n"
            "Parameters to snapshot subcommand:\n"
            "  'snapshot' is the name of the snapshot to create, apply or delete\n"
@@ -183,27 +186,6 @@
 }
 #endif
 
-static int set_cache_flag(const char *mode, int *flags)
-{
-    *flags &= ~BDRV_O_CACHE_MASK;
-
-    if (!strcmp(mode, "none") || !strcmp(mode, "off")) {
-        *flags |= BDRV_O_CACHE_WB;
-        *flags |= BDRV_O_NOCACHE;
-    } else if (!strcmp(mode, "writeback")) {
-        *flags |= BDRV_O_CACHE_WB;
-    } else if (!strcmp(mode, "unsafe")) {
-        *flags |= BDRV_O_CACHE_WB;
-        *flags |= BDRV_O_NO_FLUSH;
-    } else if (!strcmp(mode, "writethrough")) {
-        /* this is the default */
-    } else {
-        return -1;
-    }
-
-    return 0;
-}
-
 static int print_block_option_help(const char *filename, const char *fmt)
 {
     BlockDriver *drv, *proto_drv;
@@ -495,7 +477,7 @@
     filename = argv[optind++];
 
     flags = BDRV_O_RDWR;
-    ret = set_cache_flag(cache, &flags);
+    ret = bdrv_parse_cache_flags(cache, &flags);
     if (ret < 0) {
         error_report("Invalid cache option: %s", cache);
         return -1;
@@ -591,6 +573,48 @@
 }
 
 /*
+ * Like is_allocated_sectors, but if the buffer starts with a used sector,
+ * up to 'min' consecutive sectors containing zeros are ignored. This avoids
+ * breaking up write requests for only small sparse areas.
+ */
+static int is_allocated_sectors_min(const uint8_t *buf, int n, int *pnum,
+    int min)
+{
+    int ret;
+    int num_checked, num_used;
+
+    if (n < min) {
+        min = n;
+    }
+
+    ret = is_allocated_sectors(buf, n, pnum);
+    if (!ret) {
+        return ret;
+    }
+
+    num_used = *pnum;
+    buf += BDRV_SECTOR_SIZE * *pnum;
+    n -= *pnum;
+    num_checked = num_used;
+
+    while (n > 0) {
+        ret = is_allocated_sectors(buf, n, pnum);
+
+        buf += BDRV_SECTOR_SIZE * *pnum;
+        n -= *pnum;
+        num_checked += *pnum;
+        if (ret) {
+            num_used = num_checked;
+        } else if (*pnum >= min) {
+            break;
+        }
+    }
+
+    *pnum = num_used;
+    return 1;
+}
+
+/*
  * Compares two buffers sector by sector. Returns 0 if the first sector of both
  * buffers matches, non-zero otherwise.
  *
@@ -640,6 +664,7 @@
     char *options = NULL;
     const char *snapshot_name = NULL;
     float local_progress;
+    int min_sparse = 8; /* Need at least 4k of zeros for sparse detection */
 
     fmt = NULL;
     out_fmt = "raw";
@@ -647,7 +672,7 @@
     out_baseimg = NULL;
     compress = 0;
     for(;;) {
-        c = getopt(argc, argv, "f:O:B:s:hce6o:pt:");
+        c = getopt(argc, argv, "f:O:B:s:hce6o:pS:t:");
         if (c == -1) {
             break;
         }
@@ -682,6 +707,18 @@
         case 's':
             snapshot_name = optarg;
             break;
+        case 'S':
+        {
+            int64_t sval;
+            sval = strtosz_suffix(optarg, NULL, STRTOSZ_DEFSUFFIX_B);
+            if (sval < 0) {
+                error_report("Invalid minimum zero buffer size for sparse output specified");
+                return 1;
+            }
+
+            min_sparse = sval / BDRV_SECTOR_SIZE;
+            break;
+        }
         case 'p':
             progress = 1;
             break;
@@ -819,7 +856,7 @@
     }
 
     flags = BDRV_O_RDWR;
-    ret = set_cache_flag(cache, &flags);
+    ret = bdrv_parse_cache_flags(cache, &flags);
     if (ret < 0) {
         error_report("Invalid cache option: %s", cache);
         return -1;
@@ -834,7 +871,7 @@
     bs_i = 0;
     bs_offset = 0;
     bdrv_get_geometry(bs[0], &bs_sectors);
-    buf = g_malloc(IO_BUF_SIZE);
+    buf = qemu_blockalign(out_bs, IO_BUF_SIZE);
 
     if (compress) {
         ret = bdrv_get_info(out_bs, &bdi);
@@ -890,7 +927,8 @@
 
                 ret = bdrv_read(bs[bs_i], bs_num, buf2, nlow);
                 if (ret < 0) {
-                    error_report("error while reading");
+                    error_report("error while reading sector %" PRId64 ": %s",
+                                 bs_num, strerror(-ret));
                     goto out;
                 }
 
@@ -908,8 +946,8 @@
                 ret = bdrv_write_compressed(out_bs, sector_num, buf,
                                             cluster_sectors);
                 if (ret != 0) {
-                    error_report("error while compressing sector %" PRId64,
-                          sector_num);
+                    error_report("error while compressing sector %" PRId64
+                                 ": %s", sector_num, strerror(-ret));
                     goto out;
                 }
             }
@@ -972,7 +1010,8 @@
 
             ret = bdrv_read(bs[bs_i], sector_num - bs_offset, buf, n);
             if (ret < 0) {
-                error_report("error while reading");
+                error_report("error while reading sector %" PRId64 ": %s",
+                             sector_num - bs_offset, strerror(-ret));
                 goto out;
             }
             /* NOTE: at the same time we convert, we do not write zero
@@ -988,10 +1027,11 @@
                    sectors that are entirely 0, since whatever data was
                    already there is garbage, not 0s. */
                 if (!has_zero_init || out_baseimg ||
-                    is_allocated_sectors(buf1, n, &n1)) {
+                    is_allocated_sectors_min(buf1, n, &n1, min_sparse)) {
                     ret = bdrv_write(out_bs, sector_num, buf1, n1);
                     if (ret < 0) {
-                        error_report("error while writing");
+                        error_report("error while writing sector %" PRId64
+                                     ": %s", sector_num, strerror(-ret));
                         goto out;
                     }
                 }
@@ -1006,7 +1046,7 @@
     qemu_progress_end();
     free_option_parameters(create_options);
     free_option_parameters(param);
-    g_free(buf);
+    qemu_vfree(buf);
     if (out_bs) {
         bdrv_delete(out_bs);
     }
@@ -1291,7 +1331,7 @@
     qemu_progress_print(0, 100);
 
     flags = BDRV_O_RDWR | (unsafe ? BDRV_O_NO_BACKING : 0);
-    ret = set_cache_flag(cache, &flags);
+    ret = bdrv_parse_cache_flags(cache, &flags);
     if (ret < 0) {
         error_report("Invalid cache option: %s", cache);
         return -1;
@@ -1373,8 +1413,8 @@
         uint8_t * buf_new;
         float local_progress;
 
-        buf_old = g_malloc(IO_BUF_SIZE);
-        buf_new = g_malloc(IO_BUF_SIZE);
+        buf_old = qemu_blockalign(bs, IO_BUF_SIZE);
+        buf_new = qemu_blockalign(bs, IO_BUF_SIZE);
 
         bdrv_get_geometry(bs, &num_sectors);
 
@@ -1430,8 +1470,8 @@
             qemu_progress_print(local_progress, 100);
         }
 
-        g_free(buf_old);
-        g_free(buf_new);
+        qemu_vfree(buf_old);
+        qemu_vfree(buf_new);
     }
 
     /*
diff --git a/qemu-img.texi b/qemu-img.texi
index 495a1b6..70fa321 100644
--- a/qemu-img.texi
+++ b/qemu-img.texi
@@ -40,6 +40,11 @@
 with or without a command shows help and lists the supported formats
 @item -p
 display progress bar (convert and rebase commands only)
+@item -S @var{size}
+indicates the consecutive number of bytes that must contain only zeros
+for qemu-img to create a sparse image during conversion. This value is rounded
+down to the nearest 512 bytes. You may use the common size suffixes like
+@code{k} for kilobytes.
 @end table
 
 Parameters to snapshot subcommand:
@@ -86,7 +91,7 @@
 
 Commit the changes recorded in @var{filename} in its base image.
 
-@item convert [-c] [-p] [-f @var{fmt}] [-O @var{output_fmt}] [-o @var{options}] [-s @var{snapshot_name}] @var{filename} [@var{filename2} [...]] @var{output_filename}
+@item convert [-c] [-p] [-f @var{fmt}] [-O @var{output_fmt}] [-o @var{options}] [-s @var{snapshot_name}] [-S @var{sparse_size}] @var{filename} [@var{filename2} [...]] @var{output_filename}
 
 Convert the disk image @var{filename} or a snapshot @var{snapshot_name} to disk image @var{output_filename}
 using format @var{output_fmt}. It can be optionally compressed (@code{-c}
diff --git a/qemu-options.hx b/qemu-options.hx
index d86815d..35d95d1 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -133,7 +133,7 @@
 DEF("drive", HAS_ARG, QEMU_OPTION_drive,
     "-drive [file=file][,if=type][,bus=n][,unit=m][,media=d][,index=i]\n"
     "       [,cyls=c,heads=h,secs=s[,trans=t]][,snapshot=on|off]\n"
-    "       [,cache=writethrough|writeback|none|unsafe][,format=f]\n"
+    "       [,cache=writethrough|writeback|none|directsync|unsafe][,format=f]\n"
     "       [,serial=s][,addr=A][,id=name][,aio=threads|native]\n"
     "       [,readonly=on|off]\n"
     "                use 'file' as a drive image\n", QEMU_ARCH_ALL)
@@ -164,7 +164,7 @@
 @item snapshot=@var{snapshot}
 @var{snapshot} is "on" or "off" and allows to enable snapshot for given drive (see @option{-snapshot}).
 @item cache=@var{cache}
-@var{cache} is "none", "writeback", "unsafe", or "writethrough" and controls how the host cache is used to access block data.
+@var{cache} is "none", "writeback", "unsafe", "directsync" or "writethrough" and controls how the host cache is used to access block data.
 @item aio=@var{aio}
 @var{aio} is "threads", or "native" and selects between pthread based disk I/O and native Linux AIO.
 @item format=@var{format}
@@ -199,6 +199,10 @@
 attempt to do disk IO directly to the guests memory.  QEMU may still perform
 an internal copy of the data.
 
+The host page cache can be avoided while only sending write notifications to
+the guest when the data has been reported as written by the storage subsystem
+using @option{cache=directsync}.
+
 Some block drivers perform badly with @option{cache=writethrough}, most notably,
 qcow2.  If performance is more important than correctness,
 @option{cache=writeback} should be used with qcow2.
diff --git a/qmp-commands.hx b/qmp-commands.hx
index 03f67da..27cc66e 100644
--- a/qmp-commands.hx
+++ b/qmp-commands.hx
@@ -1201,6 +1201,10 @@
     - "wr_bytes": bytes written (json-int)
     - "rd_operations": read operations (json-int)
     - "wr_operations": write operations (json-int)
+    - "flush_operations": cache flush operations (json-int)
+    - "wr_total_time_ns": total time spend on writes in nano-seconds (json-int)
+    - "rd_total_time_ns": total time spend on reads in nano-seconds (json-int)
+    - "flush_total_time_ns": total time spend on cache flushes in nano-seconds (json-int)
     - "wr_highest_offset": Highest offset of a sector written since the
                            BlockDriverState has been opened (json-int)
 - "parent": Contains recursively the statistics of the underlying
@@ -1222,6 +1226,10 @@
                   "wr_operations":751,
                   "rd_bytes":122567168,
                   "rd_operations":36772
+                  "wr_total_times_ns":313253456
+                  "rd_total_times_ns":3465673657
+                  "flush_total_times_ns":49653
+                  "flush_operations":61,
                }
             },
             "stats":{
@@ -1230,6 +1238,10 @@
                "wr_operations":692,
                "rd_bytes":122739200,
                "rd_operations":36604
+               "flush_operations":51,
+               "wr_total_times_ns":313253456
+               "rd_total_times_ns":3465673657
+               "flush_total_times_ns":49653
             }
          },
          {
@@ -1240,6 +1252,10 @@
                "wr_operations":0,
                "rd_bytes":0,
                "rd_operations":0
+               "flush_operations":0,
+               "wr_total_times_ns":0
+               "rd_total_times_ns":0
+               "flush_total_times_ns":0
             }
          },
          {
@@ -1250,6 +1266,10 @@
                "wr_operations":0,
                "rd_bytes":0,
                "rd_operations":0
+               "flush_operations":0,
+               "wr_total_times_ns":0
+               "rd_total_times_ns":0
+               "flush_total_times_ns":0
             }
          },
          {
@@ -1260,6 +1280,10 @@
                "wr_operations":0,
                "rd_bytes":0,
                "rd_operations":0
+               "flush_operations":0,
+               "wr_total_times_ns":0
+               "rd_total_times_ns":0
+               "flush_total_times_ns":0
             }
          }
       ]