Merge remote-tracking branch 'stefanha/block' into staging

# By Stefan Hajnoczi (14) and others
# Via Stefan Hajnoczi
* stefanha/block: (28 commits)
  blockdev: Fix up copyright and permission notice
  qemu-iotests: use -nographic in test case 007
  qemu-iotests: add tests for rebasing zero clusters
  dataplane: fix hang introduced by AioContext transition
  coroutine: use AioContext for CoQueue BH
  threadpool: drop global thread pool
  block: add bdrv_get_aio_context()
  aio: add a ThreadPool instance to AioContext
  threadpool: add thread_pool_new() and thread_pool_free()
  threadpool: move globals into struct ThreadPool
  main-loop: add qemu_get_aio_context()
  sheepdog: set io_flush handler in do_co_req
  sheepdog: use non-blocking fd in coroutine context
  qcow2: make is_allocated return true for zero clusters
  qcow2: drop unnecessary flush in qcow2_update_snapshot_refcount()
  qcow2: drop flush in update_cluster_refcount()
  qcow2: flush in qcow2_update_snapshot_refcount()
  qcow2: set L2 cache dependency in qcow2_alloc_bytes()
  qcow2: flush refcount cache correctly in qcow2_write_snapshots()
  qcow2: flush refcount cache correctly in alloc_refcount_block()
  ...
diff --git a/async.c b/async.c
index f2d47ba..90fe906 100644
--- a/async.c
+++ b/async.c
@@ -24,6 +24,7 @@
 
 #include "qemu-common.h"
 #include "block/aio.h"
+#include "block/thread-pool.h"
 #include "qemu/main-loop.h"
 
 /***********************************************************/
@@ -172,6 +173,7 @@
 {
     AioContext *ctx = (AioContext *) source;
 
+    thread_pool_free(ctx->thread_pool);
     aio_set_event_notifier(ctx, &ctx->notifier, NULL, NULL);
     event_notifier_cleanup(&ctx->notifier);
     g_array_free(ctx->pollfds, TRUE);
@@ -190,6 +192,14 @@
     return &ctx->source;
 }
 
+ThreadPool *aio_get_thread_pool(AioContext *ctx)
+{
+    if (!ctx->thread_pool) {
+        ctx->thread_pool = thread_pool_new(ctx);
+    }
+    return ctx->thread_pool;
+}
+
 void aio_notify(AioContext *ctx)
 {
     event_notifier_set(&ctx->notifier);
@@ -200,6 +210,7 @@
     AioContext *ctx;
     ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
     ctx->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
+    ctx->thread_pool = NULL;
     event_notifier_init(&ctx->notifier, false);
     aio_set_event_notifier(ctx, &ctx->notifier, 
                            (EventNotifierHandler *)
diff --git a/block.c b/block.c
index 124a9eb..037e15e 100644
--- a/block.c
+++ b/block.c
@@ -665,15 +665,18 @@
 
 /*
  * Common part for opening disk images and files
+ *
+ * Removes all processed options from *options.
  */
 static int bdrv_open_common(BlockDriverState *bs, BlockDriverState *file,
-    const char *filename,
+    const char *filename, QDict *options,
     int flags, BlockDriver *drv)
 {
     int ret, open_flags;
 
     assert(drv != NULL);
     assert(bs->file == NULL);
+    assert(options == NULL || bs->options != options);
 
     trace_bdrv_open_common(bs, filename, flags, drv->format_name);
 
@@ -710,7 +713,7 @@
     } else {
         assert(file != NULL);
         bs->file = file;
-        ret = drv->bdrv_open(bs, open_flags);
+        ret = drv->bdrv_open(bs, options, open_flags);
     }
 
     if (ret < 0) {
@@ -752,7 +755,7 @@
     }
 
     bs = bdrv_new("");
-    ret = bdrv_open_common(bs, NULL, filename, flags, drv);
+    ret = bdrv_open_common(bs, NULL, filename, NULL, flags, drv);
     if (ret < 0) {
         bdrv_delete(bs);
         return ret;
@@ -788,7 +791,8 @@
     /* backing files always opened read-only */
     back_flags = bs->open_flags & ~(BDRV_O_RDWR | BDRV_O_SNAPSHOT);
 
-    ret = bdrv_open(bs->backing_hd, backing_filename, back_flags, back_drv);
+    ret = bdrv_open(bs->backing_hd, backing_filename, NULL,
+                    back_flags, back_drv);
     if (ret < 0) {
         bdrv_delete(bs->backing_hd);
         bs->backing_hd = NULL;
@@ -800,15 +804,29 @@
 
 /*
  * Opens a disk image (raw, qcow2, vmdk, ...)
+ *
+ * options is a QDict of options to pass to the block drivers, or NULL for an
+ * empty set of options. The reference to the QDict belongs to the block layer
+ * after the call (even on failure), so if the caller intends to reuse the
+ * dictionary, it needs to use QINCREF() before calling bdrv_open.
  */
-int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
-              BlockDriver *drv)
+int bdrv_open(BlockDriverState *bs, const char *filename, QDict *options,
+              int flags, BlockDriver *drv)
 {
     int ret;
     /* TODO: extra byte is a hack to ensure MAX_PATH space on Windows. */
     char tmp_filename[PATH_MAX + 1];
     BlockDriverState *file = NULL;
 
+    /* NULL means an empty set of options */
+    if (options == NULL) {
+        options = qdict_new();
+    }
+
+    bs->options = options;
+    options = qdict_clone_shallow(options);
+
+    /* For snapshot=on, create a temporary qcow2 overlay */
     if (flags & BDRV_O_SNAPSHOT) {
         BlockDriverState *bs1;
         int64_t total_size;
@@ -822,10 +840,10 @@
 
         /* if there is a backing file, use it */
         bs1 = bdrv_new("");
-        ret = bdrv_open(bs1, filename, 0, drv);
+        ret = bdrv_open(bs1, filename, NULL, 0, drv);
         if (ret < 0) {
             bdrv_delete(bs1);
-            return ret;
+            goto fail;
         }
         total_size = bdrv_getlength(bs1) & BDRV_SECTOR_MASK;
 
@@ -836,15 +854,17 @@
 
         ret = get_tmp_filename(tmp_filename, sizeof(tmp_filename));
         if (ret < 0) {
-            return ret;
+            goto fail;
         }
 
         /* Real path is meaningless for protocols */
-        if (is_protocol)
+        if (is_protocol) {
             snprintf(backing_filename, sizeof(backing_filename),
                      "%s", filename);
-        else if (!realpath(filename, backing_filename))
-            return -errno;
+        } else if (!realpath(filename, backing_filename)) {
+            ret = -errno;
+            goto fail;
+        }
 
         bdrv_qcow2 = bdrv_find_format("qcow2");
         options = parse_option_parameters("", bdrv_qcow2->create_options, NULL);
@@ -859,7 +879,7 @@
         ret = bdrv_create(bdrv_qcow2, tmp_filename, options);
         free_option_parameters(options);
         if (ret < 0) {
-            return ret;
+            goto fail;
         }
 
         filename = tmp_filename;
@@ -874,7 +894,7 @@
 
     ret = bdrv_file_open(&file, filename, bdrv_open_flags(bs, flags));
     if (ret < 0) {
-        return ret;
+        goto fail;
     }
 
     /* Find the right image format driver */
@@ -887,7 +907,7 @@
     }
 
     /* Open the image */
-    ret = bdrv_open_common(bs, file, filename, flags, drv);
+    ret = bdrv_open_common(bs, file, filename, options, flags, drv);
     if (ret < 0) {
         goto unlink_and_fail;
     }
@@ -901,11 +921,22 @@
     if ((flags & BDRV_O_NO_BACKING) == 0) {
         ret = bdrv_open_backing_file(bs);
         if (ret < 0) {
-            bdrv_close(bs);
-            return ret;
+            goto close_and_fail;
         }
     }
 
+    /* Check if any unknown options were used */
+    if (qdict_size(options) != 0) {
+        const QDictEntry *entry = qdict_first(options);
+        qerror_report(ERROR_CLASS_GENERIC_ERROR, "Block format '%s' used by "
+            "device '%s' doesn't support the option '%s'",
+            drv->format_name, bs->device_name, entry->key);
+
+        ret = -EINVAL;
+        goto close_and_fail;
+    }
+    QDECREF(options);
+
     if (!bdrv_key_required(bs)) {
         bdrv_dev_change_media_cb(bs, true);
     }
@@ -924,6 +955,15 @@
     if (bs->is_temporary) {
         unlink(filename);
     }
+fail:
+    QDECREF(bs->options);
+    QDECREF(options);
+    bs->options = NULL;
+    return ret;
+
+close_and_fail:
+    bdrv_close(bs);
+    QDECREF(options);
     return ret;
 }
 
@@ -1193,6 +1233,8 @@
         bs->valid_key = 0;
         bs->sg = 0;
         bs->growable = 0;
+        QDECREF(bs->options);
+        bs->options = NULL;
 
         if (bs->file != NULL) {
             bdrv_delete(bs->file);
@@ -3190,7 +3232,7 @@
     if (bs->file) {
         drv->bdrv_close(bs);
         ret = bdrv_snapshot_goto(bs->file, snapshot_id);
-        open_ret = drv->bdrv_open(bs, bs->open_flags);
+        open_ret = drv->bdrv_open(bs, NULL, bs->open_flags);
         if (open_ret < 0) {
             bdrv_delete(bs->file);
             bs->drv = NULL;
@@ -4594,7 +4636,8 @@
 
             bs = bdrv_new("");
 
-            ret = bdrv_open(bs, backing_file->value.s, back_flags, backing_drv);
+            ret = bdrv_open(bs, backing_file->value.s, NULL, back_flags,
+                            backing_drv);
             if (ret < 0) {
                 error_setg_errno(errp, -ret, "Could not open '%s'",
                                  backing_file->value.s);
@@ -4638,3 +4681,9 @@
         bdrv_delete(bs);
     }
 }
+
+AioContext *bdrv_get_aio_context(BlockDriverState *bs)
+{
+    /* Currently BlockDriverState always uses the main loop AioContext */
+    return qemu_get_aio_context();
+}
diff --git a/block/blkverify.c b/block/blkverify.c
index a7dd459..2086d97 100644
--- a/block/blkverify.c
+++ b/block/blkverify.c
@@ -98,7 +98,7 @@
 
     /* Open the test file */
     s->test_file = bdrv_new("");
-    ret = bdrv_open(s->test_file, filename, flags, NULL);
+    ret = bdrv_open(s->test_file, filename, NULL, flags, NULL);
     if (ret < 0) {
         bdrv_delete(s->test_file);
         s->test_file = NULL;
diff --git a/block/bochs.c b/block/bochs.c
index a6eb33d..d7078c0 100644
--- a/block/bochs.c
+++ b/block/bochs.c
@@ -108,7 +108,7 @@
     return 0;
 }
 
-static int bochs_open(BlockDriverState *bs, int flags)
+static int bochs_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVBochsState *s = bs->opaque;
     int i;
diff --git a/block/cloop.c b/block/cloop.c
index 8fe13e9..6ea7cf4 100644
--- a/block/cloop.c
+++ b/block/cloop.c
@@ -53,7 +53,7 @@
     return 0;
 }
 
-static int cloop_open(BlockDriverState *bs, int flags)
+static int cloop_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVCloopState *s = bs->opaque;
     uint32_t offsets_size, max_compressed_block_size = 1, i;
diff --git a/block/cow.c b/block/cow.c
index 4baf904..d73e08c 100644
--- a/block/cow.c
+++ b/block/cow.c
@@ -58,7 +58,7 @@
         return 0;
 }
 
-static int cow_open(BlockDriverState *bs, int flags)
+static int cow_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVCowState *s = bs->opaque;
     struct cow_header_v2 cow_header;
diff --git a/block/dmg.c b/block/dmg.c
index 6d85801..c1066df 100644
--- a/block/dmg.c
+++ b/block/dmg.c
@@ -85,7 +85,7 @@
     return 0;
 }
 
-static int dmg_open(BlockDriverState *bs, int flags)
+static int dmg_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVDMGState *s = bs->opaque;
     uint64_t info_begin,info_end,last_in_offset,last_out_offset;
diff --git a/block/parallels.c b/block/parallels.c
index 8688f6c..18b3ac0 100644
--- a/block/parallels.c
+++ b/block/parallels.c
@@ -68,7 +68,7 @@
     return 0;
 }
 
-static int parallels_open(BlockDriverState *bs, int flags)
+static int parallels_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVParallelsState *s = bs->opaque;
     int i;
diff --git a/block/qcow.c b/block/qcow.c
index a7135ee..f6750a5 100644
--- a/block/qcow.c
+++ b/block/qcow.c
@@ -92,7 +92,7 @@
         return 0;
 }
 
-static int qcow_open(BlockDriverState *bs, int flags)
+static int qcow_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVQcowState *s = bs->opaque;
     int len, i, shift, ret;
diff --git a/block/qcow2-cluster.c b/block/qcow2-cluster.c
index 56fccf9..d72d063 100644
--- a/block/qcow2-cluster.c
+++ b/block/qcow2-cluster.c
@@ -454,6 +454,9 @@
         *cluster_offset &= L2E_COMPRESSED_OFFSET_SIZE_MASK;
         break;
     case QCOW2_CLUSTER_ZERO:
+        if (s->qcow_version < 3) {
+            return -EIO;
+        }
         c = count_contiguous_clusters(nb_clusters, s->cluster_size,
                 &l2_table[l2_index], 0,
                 QCOW_OFLAG_COMPRESSED | QCOW_OFLAG_ZERO);
@@ -668,7 +671,7 @@
     }
 
     /* Update L2 table. */
-    if (s->compatible_features & QCOW2_COMPAT_LAZY_REFCOUNTS) {
+    if (s->use_lazy_refcounts) {
         qcow2_mark_dirty(bs);
     }
     if (qcow2_need_accurate_refcounts(s)) {
diff --git a/block/qcow2-refcount.c b/block/qcow2-refcount.c
index 55543ed..9bfb390 100644
--- a/block/qcow2-refcount.c
+++ b/block/qcow2-refcount.c
@@ -201,7 +201,10 @@
     *refcount_block = NULL;
 
     /* We write to the refcount table, so we might depend on L2 tables */
-    qcow2_cache_flush(bs, s->l2_table_cache);
+    ret = qcow2_cache_flush(bs, s->l2_table_cache);
+    if (ret < 0) {
+        return ret;
+    }
 
     /* Allocate the refcount block itself and mark it as used */
     int64_t new_block = alloc_clusters_noref(bs, s->cluster_size);
@@ -237,7 +240,10 @@
             goto fail_block;
         }
 
-        bdrv_flush(bs->file);
+        ret = qcow2_cache_flush(bs, s->refcount_block_cache);
+        if (ret < 0) {
+            goto fail_block;
+        }
 
         /* Initialize the new refcount block only after updating its refcount,
          * update_refcount uses the refcount cache itself */
@@ -526,8 +532,6 @@
         return ret;
     }
 
-    bdrv_flush(bs->file);
-
     return get_refcount(bs, cluster_index);
 }
 
@@ -663,7 +667,11 @@
         }
     }
 
-    bdrv_flush(bs->file);
+    /* The cluster refcount was incremented, either by qcow2_alloc_clusters()
+     * or explicitly by update_cluster_refcount().  Refcount blocks must be
+     * flushed before the caller's L2 table updates.
+     */
+    qcow2_cache_set_dependency(bs, s->l2_table_cache, s->refcount_block_cache);
     return offset;
 }
 
@@ -782,10 +790,6 @@
                             if (ret < 0) {
                                 goto fail;
                             }
-
-                            /* TODO Flushing once for the whole function should
-                             * be enough */
-                            bdrv_flush(bs->file);
                         }
                         /* compressed clusters are never modified */
                         refcount = 2;
@@ -841,7 +845,7 @@
         }
     }
 
-    ret = 0;
+    ret = bdrv_flush(bs);
 fail:
     if (l2_table) {
         qcow2_cache_put(bs, s->l2_table_cache, (void**) &l2_table);
diff --git a/block/qcow2-snapshot.c b/block/qcow2-snapshot.c
index eb8fcd5..992a5c8 100644
--- a/block/qcow2-snapshot.c
+++ b/block/qcow2-snapshot.c
@@ -180,11 +180,14 @@
 
     /* Allocate space for the new snapshot list */
     snapshots_offset = qcow2_alloc_clusters(bs, snapshots_size);
-    bdrv_flush(bs->file);
     offset = snapshots_offset;
     if (offset < 0) {
         return offset;
     }
+    ret = bdrv_flush(bs);
+    if (ret < 0) {
+        return ret;
+    }
 
     /* Write all snapshots to the new list */
     for(i = 0; i < s->nb_snapshots; i++) {
@@ -378,11 +381,6 @@
         goto fail;
     }
 
-    ret = bdrv_flush(bs);
-    if (ret < 0) {
-        goto fail;
-    }
-
     /* Append the new snapshot to the snapshot list */
     new_snapshot_list = g_malloc((s->nb_snapshots + 1) * sizeof(QCowSnapshot));
     if (s->snapshots) {
diff --git a/block/qcow2.c b/block/qcow2.c
index 7610e56..1f99866 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -285,11 +285,26 @@
     return ret;
 }
 
-static int qcow2_open(BlockDriverState *bs, int flags)
+static QemuOptsList qcow2_runtime_opts = {
+    .name = "qcow2",
+    .head = QTAILQ_HEAD_INITIALIZER(qcow2_runtime_opts.head),
+    .desc = {
+        {
+            .name = "lazy_refcounts",
+            .type = QEMU_OPT_BOOL,
+            .help = "Postpone refcount updates",
+        },
+        { /* end of list */ }
+    },
+};
+
+static int qcow2_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVQcowState *s = bs->opaque;
     int len, i, ret = 0;
     QCowHeader header;
+    QemuOpts *opts;
+    Error *local_err = NULL;
     uint64_t ext_end;
 
     ret = bdrv_pread(bs->file, 0, &header, sizeof(header));
@@ -495,6 +510,28 @@
         }
     }
 
+    /* Enable lazy_refcounts according to image and command line options */
+    opts = qemu_opts_create_nofail(&qcow2_runtime_opts);
+    qemu_opts_absorb_qdict(opts, options, &local_err);
+    if (error_is_set(&local_err)) {
+        qerror_report_err(local_err);
+        error_free(local_err);
+        ret = -EINVAL;
+        goto fail;
+    }
+
+    s->use_lazy_refcounts = qemu_opt_get_bool(opts, "lazy_refcounts",
+        (s->compatible_features & QCOW2_COMPAT_LAZY_REFCOUNTS));
+
+    qemu_opts_del(opts);
+
+    if (s->use_lazy_refcounts && s->qcow_version < 3) {
+        qerror_report(ERROR_CLASS_GENERIC_ERROR, "Lazy refcounts require "
+            "a qcow2 image with at least qemu 1.1 compatibility level");
+        ret = -EINVAL;
+        goto fail;
+    }
+
 #ifdef DEBUG_ALLOC
     {
         BdrvCheckResult result = {0};
@@ -584,7 +621,7 @@
         *pnum = 0;
     }
 
-    return (cluster_offset != 0);
+    return (cluster_offset != 0) || (ret == QCOW2_CLUSTER_ZERO);
 }
 
 /* handle reading after the end of the backing file */
@@ -665,10 +702,6 @@
             break;
 
         case QCOW2_CLUSTER_ZERO:
-            if (s->qcow_version < 3) {
-                ret = -EIO;
-                goto fail;
-            }
             qemu_iovec_memset(&hd_qiov, 0, 0, 512 * cur_nr_sectors);
             break;
 
@@ -912,7 +945,7 @@
     qcow2_close(bs);
 
     memset(s, 0, sizeof(BDRVQcowState));
-    qcow2_open(bs, flags);
+    qcow2_open(bs, NULL, flags);
 
     if (crypt_method) {
         s->crypt_method = crypt_method;
@@ -1265,7 +1298,7 @@
      */
     BlockDriver* drv = bdrv_find_format("qcow2");
     assert(drv != NULL);
-    ret = bdrv_open(bs, filename,
+    ret = bdrv_open(bs, filename, NULL,
         BDRV_O_RDWR | BDRV_O_CACHE_WB | BDRV_O_NO_FLUSH, drv);
     if (ret < 0) {
         goto out;
diff --git a/block/qcow2.h b/block/qcow2.h
index 718b52b..103abdb 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -173,6 +173,7 @@
 
     int flags;
     int qcow_version;
+    bool use_lazy_refcounts;
 
     uint64_t incompatible_features;
     uint64_t compatible_features;
diff --git a/block/qed.c b/block/qed.c
index b8515e5..46e12b3 100644
--- a/block/qed.c
+++ b/block/qed.c
@@ -373,7 +373,7 @@
     s->bs = bs;
 }
 
-static int bdrv_qed_open(BlockDriverState *bs, int flags)
+static int bdrv_qed_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVQEDState *s = bs->opaque;
     QEDHeader le_header;
@@ -1526,7 +1526,7 @@
 
     bdrv_qed_close(bs);
     memset(s, 0, sizeof(BDRVQEDState));
-    bdrv_qed_open(bs, bs->open_flags);
+    bdrv_qed_open(bs, NULL, bs->open_flags);
 }
 
 static int bdrv_qed_check(BlockDriverState *bs, BdrvCheckResult *result,
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 4dfdf98..8a3cdbc 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -750,6 +750,7 @@
         BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
     RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
+    ThreadPool *pool;
 
     acb->bs = bs;
     acb->aio_type = type;
@@ -763,7 +764,8 @@
     acb->aio_offset = sector_num * 512;
 
     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
@@ -1413,6 +1415,7 @@
 {
     BDRVRawState *s = bs->opaque;
     RawPosixAIOData *acb;
+    ThreadPool *pool;
 
     if (fd_open(bs) < 0)
         return NULL;
@@ -1424,7 +1427,8 @@
     acb->aio_offset = 0;
     acb->aio_ioctl_buf = buf;
     acb->aio_ioctl_cmd = req;
-    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
diff --git a/block/raw-win32.c b/block/raw-win32.c
index b89ac19..18e0068 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -144,6 +144,7 @@
         BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
     RawWin32AIOData *acb = g_slice_new(RawWin32AIOData);
+    ThreadPool *pool;
 
     acb->bs = bs;
     acb->hfile = hfile;
@@ -157,7 +158,8 @@
     acb->aio_offset = sector_num * 512;
 
     trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
+    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 int qemu_ftruncate64(int fd, int64_t length)
diff --git a/block/raw.c b/block/raw.c
index 75812db..ce10422 100644
--- a/block/raw.c
+++ b/block/raw.c
@@ -3,7 +3,7 @@
 #include "block/block_int.h"
 #include "qemu/module.h"
 
-static int raw_open(BlockDriverState *bs, int flags)
+static int raw_open(BlockDriverState *bs, QDict *options, int flags)
 {
     bs->sg = bs->file->sg;
     return 0;
diff --git a/block/sheepdog.c b/block/sheepdog.c
index c711c28..4245328 100644
--- a/block/sheepdog.c
+++ b/block/sheepdog.c
@@ -468,6 +468,8 @@
     if (err != NULL) {
         qerror_report_err(err);
         error_free(err);
+    } else {
+        socket_set_nonblock(fd);
     }
 
     return fd;
@@ -499,6 +501,13 @@
     qemu_coroutine_enter(co, NULL);
 }
 
+static int have_co_req(void *opaque)
+{
+    /* this handler is set only when there is a pending request, so
+     * always returns 1. */
+    return 1;
+}
+
 typedef struct SheepdogReqCo {
     int sockfd;
     SheepdogReq *hdr;
@@ -521,15 +530,14 @@
     unsigned int *rlen = srco->rlen;
 
     co = qemu_coroutine_self();
-    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, NULL, co);
+    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, have_co_req, co);
 
-    socket_set_block(sockfd);
     ret = send_co_req(sockfd, hdr, data, wlen);
     if (ret < 0) {
         goto out;
     }
 
-    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, NULL, co);
+    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, have_co_req, co);
 
     ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
     if (ret < sizeof(*hdr)) {
@@ -552,8 +560,9 @@
     }
     ret = 0;
 out:
+    /* there is at most one request for this sockfd, so it is safe to
+     * set each handler to NULL. */
     qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL);
-    socket_set_nonblock(sockfd);
 
     srco->ret = ret;
     srco->finished = true;
@@ -776,8 +785,6 @@
         return fd;
     }
 
-    socket_set_nonblock(fd);
-
     qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s);
     return fd;
 }
diff --git a/block/vdi.c b/block/vdi.c
index 87c691b..2662d89 100644
--- a/block/vdi.c
+++ b/block/vdi.c
@@ -364,7 +364,7 @@
     return result;
 }
 
-static int vdi_open(BlockDriverState *bs, int flags)
+static int vdi_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVVdiState *s = bs->opaque;
     VdiHeader header;
diff --git a/block/vmdk.c b/block/vmdk.c
index aef1abc..e92104a 100644
--- a/block/vmdk.c
+++ b/block/vmdk.c
@@ -723,7 +723,7 @@
     return vmdk_parse_extents(buf, bs, bs->file->filename);
 }
 
-static int vmdk_open(BlockDriverState *bs, int flags)
+static int vmdk_open(BlockDriverState *bs, QDict *options, int flags)
 {
     int ret;
     BDRVVmdkState *s = bs->opaque;
@@ -1527,7 +1527,7 @@
     if (backing_file) {
         char parent_filename[PATH_MAX];
         BlockDriverState *bs = bdrv_new("");
-        ret = bdrv_open(bs, backing_file, 0, NULL);
+        ret = bdrv_open(bs, backing_file, NULL, 0, NULL);
         if (ret != 0) {
             bdrv_delete(bs);
             return ret;
diff --git a/block/vpc.c b/block/vpc.c
index 82229ef..3cad52e 100644
--- a/block/vpc.c
+++ b/block/vpc.c
@@ -155,7 +155,7 @@
     return 0;
 }
 
-static int vpc_open(BlockDriverState *bs, int flags)
+static int vpc_open(BlockDriverState *bs, QDict *options, int flags)
 {
     BDRVVPCState *s = bs->opaque;
     int i;
diff --git a/block/vvfat.c b/block/vvfat.c
index 06e6654..b8eb38a 100644
--- a/block/vvfat.c
+++ b/block/vvfat.c
@@ -2830,7 +2830,7 @@
         return -1;
     }
 
-    ret = bdrv_open(s->qcow, s->qcow_filename,
+    ret = bdrv_open(s->qcow, s->qcow_filename, NULL,
             BDRV_O_RDWR | BDRV_O_CACHE_WB | BDRV_O_NO_FLUSH, bdrv_qcow);
     if (ret < 0) {
 	return ret;
diff --git a/blockdev.c b/blockdev.c
index 0e67d06..09f76b7 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -5,6 +5,29 @@
  *
  * This work is licensed under the terms of the GNU GPL, version 2 or
  * later.  See the COPYING file in the top-level directory.
+ *
+ * This file incorporates work covered by the following copyright and
+ * permission notice:
+ *
+ * Copyright (c) 2003-2008 Fabrice Bellard
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
  */
 
 #include "sysemu/blockdev.h"
@@ -22,6 +45,7 @@
 #include "sysemu/arch_init.h"
 
 static QTAILQ_HEAD(drivelist, DriveInfo) drives = QTAILQ_HEAD_INITIALIZER(drives);
+extern QemuOptsList qemu_common_drive_opts;
 
 static const char *const if_name[IF_COUNT] = {
     [IF_NONE] = "none",
@@ -191,6 +215,7 @@
     bdrv_delete(dinfo->bdrv);
     g_free(dinfo->id);
     QTAILQ_REMOVE(&drives, dinfo, next);
+    g_free(dinfo->serial);
     g_free(dinfo);
 }
 
@@ -287,7 +312,7 @@
     return true;
 }
 
-DriveInfo *drive_init(QemuOpts *opts, BlockInterfaceType block_default_type)
+DriveInfo *drive_init(QemuOpts *all_opts, BlockInterfaceType block_default_type)
 {
     const char *buf;
     const char *file = NULL;
@@ -310,10 +335,36 @@
     bool copy_on_read;
     int ret;
     Error *error = NULL;
+    QemuOpts *opts;
+    QDict *bs_opts;
+    const char *id;
 
     translation = BIOS_ATA_TRANSLATION_AUTO;
     media = MEDIA_DISK;
 
+    /* Check common options by copying from all_opts to opts, all other options
+     * are stored in bs_opts. */
+    id = qemu_opts_id(all_opts);
+    opts = qemu_opts_create(&qemu_common_drive_opts, id, 1, &error);
+    if (error_is_set(&error)) {
+        qerror_report_err(error);
+        error_free(error);
+        return NULL;
+    }
+
+    bs_opts = qdict_new();
+    qemu_opts_to_qdict(all_opts, bs_opts);
+    qemu_opts_absorb_qdict(opts, bs_opts, &error);
+    if (error_is_set(&error)) {
+        qerror_report_err(error);
+        error_free(error);
+        return NULL;
+    }
+
+    if (id) {
+        qdict_del(bs_opts, "id");
+    }
+
     /* extract parameters */
     bus_id  = qemu_opt_get_number(opts, "bus", 0);
     unit_id = qemu_opt_get_number(opts, "unit", -1);
@@ -564,9 +615,11 @@
     dinfo->heads = heads;
     dinfo->secs = secs;
     dinfo->trans = translation;
-    dinfo->opts = opts;
+    dinfo->opts = all_opts;
     dinfo->refcount = 1;
-    dinfo->serial = serial;
+    if (serial != NULL) {
+        dinfo->serial = g_strdup(serial);
+    }
     QTAILQ_INSERT_TAIL(&drives, dinfo, next);
 
     bdrv_set_on_error(dinfo->bdrv, on_read_error, on_write_error);
@@ -587,17 +640,20 @@
     case IF_MTD:
         break;
     case IF_VIRTIO:
+    {
         /* add virtio block device */
-        opts = qemu_opts_create_nofail(qemu_find_opts("device"));
+        QemuOpts *devopts;
+        devopts = qemu_opts_create_nofail(qemu_find_opts("device"));
         if (arch_type == QEMU_ARCH_S390X) {
-            qemu_opt_set(opts, "driver", "virtio-blk-s390");
+            qemu_opt_set(devopts, "driver", "virtio-blk-s390");
         } else {
-            qemu_opt_set(opts, "driver", "virtio-blk-pci");
+            qemu_opt_set(devopts, "driver", "virtio-blk-pci");
         }
-        qemu_opt_set(opts, "drive", dinfo->id);
+        qemu_opt_set(devopts, "drive", dinfo->id);
         if (devaddr)
-            qemu_opt_set(opts, "addr", devaddr);
+            qemu_opt_set(devopts, "addr", devaddr);
         break;
+    }
     default:
         abort();
     }
@@ -635,7 +691,9 @@
         error_report("warning: disabling copy_on_read on readonly drive");
     }
 
-    ret = bdrv_open(dinfo->bdrv, file, bdrv_flags, drv);
+    ret = bdrv_open(dinfo->bdrv, file, bs_opts, bdrv_flags, drv);
+    bs_opts = NULL;
+
     if (ret < 0) {
         if (ret == -EMEDIUMTYPE) {
             error_report("could not open disk image %s: not in %s format",
@@ -649,9 +707,14 @@
 
     if (bdrv_key_required(dinfo->bdrv))
         autostart = 0;
+
+    qemu_opts_del(opts);
+
     return dinfo;
 
 err:
+    qemu_opts_del(opts);
+    QDECREF(bs_opts);
     bdrv_delete(dinfo->bdrv);
     g_free(dinfo->id);
     QTAILQ_REMOVE(&drives, dinfo, next);
@@ -820,7 +883,9 @@
 
         /* We will manually add the backing_hd field to the bs later */
         states->new_bs = bdrv_new("");
-        ret = bdrv_open(states->new_bs, new_image_file,
+        /* TODO Inherit bs->options or only take explicit options with an
+         * extended QMP command? */
+        ret = bdrv_open(states->new_bs, new_image_file, NULL,
                         flags | BDRV_O_NO_BACKING, drv);
         if (ret != 0) {
             error_set(errp, QERR_OPEN_FILE_FAILED, new_image_file);
@@ -921,7 +986,7 @@
                                     int bdrv_flags, BlockDriver *drv,
                                     const char *password, Error **errp)
 {
-    if (bdrv_open(bs, filename, bdrv_flags, drv) < 0) {
+    if (bdrv_open(bs, filename, NULL, bdrv_flags, drv) < 0) {
         error_set(errp, QERR_OPEN_FILE_FAILED, filename);
         return;
     }
@@ -1330,7 +1395,7 @@
      * file.
      */
     target_bs = bdrv_new("");
-    ret = bdrv_open(target_bs, target, flags | BDRV_O_NO_BACKING, drv);
+    ret = bdrv_open(target_bs, target, NULL, flags | BDRV_O_NO_BACKING, drv);
 
     if (ret < 0) {
         bdrv_delete(target_bs);
@@ -1459,9 +1524,9 @@
     return dummy.next;
 }
 
-QemuOptsList qemu_drive_opts = {
+QemuOptsList qemu_common_drive_opts = {
     .name = "drive",
-    .head = QTAILQ_HEAD_INITIALIZER(qemu_drive_opts.head),
+    .head = QTAILQ_HEAD_INITIALIZER(qemu_common_drive_opts.head),
     .desc = {
         {
             .name = "bus",
@@ -1580,3 +1645,15 @@
         { /* end of list */ }
     },
 };
+
+QemuOptsList qemu_drive_opts = {
+    .name = "drive",
+    .head = QTAILQ_HEAD_INITIALIZER(qemu_drive_opts.head),
+    .desc = {
+        /*
+         * no elements => accept any params
+         * validation will happen later
+         */
+        { /* end of list */ }
+    },
+};
diff --git a/hw/dataplane/virtio-blk.c b/hw/dataplane/virtio-blk.c
index dfe5f9b..1242d61 100644
--- a/hw/dataplane/virtio-blk.c
+++ b/hw/dataplane/virtio-blk.c
@@ -263,6 +263,11 @@
     }
 }
 
+static int flush_true(EventNotifier *e)
+{
+    return true;
+}
+
 static void handle_notify(EventNotifier *e)
 {
     VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
@@ -342,6 +347,14 @@
     }
 }
 
+static int flush_io(EventNotifier *e)
+{
+    VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
+                                           io_notifier);
+
+    return s->num_reqs > 0;
+}
+
 static void handle_io(EventNotifier *e)
 {
     VirtIOBlockDataPlane *s = container_of(e, VirtIOBlockDataPlane,
@@ -472,7 +485,7 @@
         exit(1);
     }
     s->host_notifier = *virtio_queue_get_host_notifier(vq);
-    aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify, NULL);
+    aio_set_event_notifier(s->ctx, &s->host_notifier, handle_notify, flush_true);
 
     /* Set up ioqueue */
     ioq_init(&s->ioqueue, s->fd, REQ_MAX);
@@ -480,7 +493,7 @@
         ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb);
     }
     s->io_notifier = *ioq_get_notifier(&s->ioqueue);
-    aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io, NULL);
+    aio_set_event_notifier(s->ctx, &s->io_notifier, handle_io, flush_io);
 
     s->started = true;
     trace_virtio_blk_data_plane_start(s);
diff --git a/hw/xen_disk.c b/hw/xen_disk.c
index cc09a2f..83329e2 100644
--- a/hw/xen_disk.c
+++ b/hw/xen_disk.c
@@ -763,7 +763,7 @@
         xen_be_printf(&blkdev->xendev, 2, "create new bdrv (xenbus setup)\n");
         blkdev->bs = bdrv_new(blkdev->dev);
         if (blkdev->bs) {
-            if (bdrv_open(blkdev->bs, blkdev->filename, qflags,
+            if (bdrv_open(blkdev->bs, blkdev->filename, NULL, qflags,
                         bdrv_find_whitelisted_format(blkdev->fileproto)) != 0) {
                 bdrv_delete(blkdev->bs);
                 blkdev->bs = NULL;
diff --git a/include/block/aio.h b/include/block/aio.h
index 5b54d38..1836793 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -66,6 +66,9 @@
 
     /* GPollFDs for aio_poll() */
     GArray *pollfds;
+
+    /* Thread pool for performing work and receiving completion callbacks */
+    struct ThreadPool *thread_pool;
 } AioContext;
 
 /* Returns 1 if there are still outstanding AIO requests; 0 otherwise */
@@ -223,6 +226,9 @@
  */
 GSource *aio_get_g_source(AioContext *ctx);
 
+/* Return the ThreadPool bound to this AioContext */
+struct ThreadPool *aio_get_thread_pool(AioContext *ctx);
+
 /* Functions to operate on the main QEMU AioContext.  */
 
 bool qemu_aio_wait(void);
diff --git a/include/block/block.h b/include/block/block.h
index 0f750d7..d4f34d6 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -137,8 +137,8 @@
 int bdrv_parse_discard_flags(const char *mode, int *flags);
 int bdrv_file_open(BlockDriverState **pbs, const char *filename, int flags);
 int bdrv_open_backing_file(BlockDriverState *bs);
-int bdrv_open(BlockDriverState *bs, const char *filename, int flags,
-              BlockDriver *drv);
+int bdrv_open(BlockDriverState *bs, const char *filename, QDict *options,
+              int flags, BlockDriver *drv);
 BlockReopenQueue *bdrv_reopen_queue(BlockReopenQueue *bs_queue,
                                     BlockDriverState *bs, int flags);
 int bdrv_reopen_multiple(BlockReopenQueue *bs_queue, Error **errp);
diff --git a/include/block/block_int.h b/include/block/block_int.h
index eaad53e..ce0aa26 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -82,7 +82,7 @@
     void (*bdrv_reopen_commit)(BDRVReopenState *reopen_state);
     void (*bdrv_reopen_abort)(BDRVReopenState *reopen_state);
 
-    int (*bdrv_open)(BlockDriverState *bs, int flags);
+    int (*bdrv_open)(BlockDriverState *bs, QDict *options, int flags);
     int (*bdrv_file_open)(BlockDriverState *bs, const char *filename, int flags);
     int (*bdrv_read)(BlockDriverState *bs, int64_t sector_num,
                      uint8_t *buf, int nb_sectors);
@@ -286,6 +286,7 @@
     /* long-running background operation */
     BlockJob *job;
 
+    QDict *options;
 };
 
 int get_tmp_filename(char *filename, int size);
@@ -293,6 +294,13 @@
 void bdrv_set_io_limits(BlockDriverState *bs,
                         BlockIOLimit *io_limits);
 
+/**
+ * bdrv_get_aio_context:
+ *
+ * Returns: the currently bound #AioContext
+ */
+AioContext *bdrv_get_aio_context(BlockDriverState *bs);
+
 #ifdef _WIN32
 int is_windows_drive(const char *filename);
 #endif
diff --git a/include/block/coroutine.h b/include/block/coroutine.h
index c31fae3..a978162 100644
--- a/include/block/coroutine.h
+++ b/include/block/coroutine.h
@@ -104,6 +104,7 @@
  */
 typedef struct CoQueue {
     QTAILQ_HEAD(, Coroutine) entries;
+    AioContext *ctx;
 } CoQueue;
 
 /**
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 200703e..32afcdd 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -26,9 +26,16 @@
 
 typedef int ThreadPoolFunc(void *opaque);
 
-BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
-     BlockDriverCompletionFunc *cb, void *opaque);
-int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
-void thread_pool_submit(ThreadPoolFunc *func, void *arg);
+typedef struct ThreadPool ThreadPool;
+
+ThreadPool *thread_pool_new(struct AioContext *ctx);
+void thread_pool_free(ThreadPool *pool);
+
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
+        ThreadPoolFunc *func, void *arg,
+        BlockDriverCompletionFunc *cb, void *opaque);
+int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
+        ThreadPoolFunc *func, void *arg);
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
 
 #endif
diff --git a/include/qapi/qmp/qdict.h b/include/qapi/qmp/qdict.h
index 6d9a4be..685b2e3 100644
--- a/include/qapi/qmp/qdict.h
+++ b/include/qapi/qmp/qdict.h
@@ -64,4 +64,6 @@
 int qdict_get_try_bool(const QDict *qdict, const char *key, int def_value);
 const char *qdict_get_try_str(const QDict *qdict, const char *key);
 
+QDict *qdict_clone_shallow(const QDict *src);
+
 #endif /* QDICT_H */
diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h
index 0995288..6f0200a 100644
--- a/include/qemu/main-loop.h
+++ b/include/qemu/main-loop.h
@@ -82,6 +82,11 @@
 int main_loop_wait(int nonblocking);
 
 /**
+ * qemu_get_aio_context: Return the main loop's AioContext
+ */
+AioContext *qemu_get_aio_context(void);
+
+/**
  * qemu_notify_event: Force processing of pending events.
  *
  * Similar to signaling a condition variable, qemu_notify_event forces
diff --git a/include/qemu/option.h b/include/qemu/option.h
index ba197cd..bdb6d21 100644
--- a/include/qemu/option.h
+++ b/include/qemu/option.h
@@ -149,6 +149,7 @@
 QemuOpts *qemu_opts_from_qdict(QemuOptsList *list, const QDict *qdict,
                                Error **errp);
 QDict *qemu_opts_to_qdict(QemuOpts *opts, QDict *qdict);
+void qemu_opts_absorb_qdict(QemuOpts *opts, QDict *qdict, Error **errp);
 
 typedef int (*qemu_opts_loopfunc)(QemuOpts *opts, void *opaque);
 int qemu_opts_print(QemuOpts *opts, void *dummy);
diff --git a/include/sysemu/blockdev.h b/include/sysemu/blockdev.h
index 1fe5332..804ec88 100644
--- a/include/sysemu/blockdev.h
+++ b/include/sysemu/blockdev.h
@@ -40,7 +40,7 @@
     int media_cd;
     int cyls, heads, secs, trans;
     QemuOpts *opts;
-    const char *serial;
+    char *serial;
     QTAILQ_ENTRY(DriveInfo) next;
     int refcount;
 };
diff --git a/main-loop.c b/main-loop.c
index 8c9b58c..eb80ff3 100644
--- a/main-loop.c
+++ b/main-loop.c
@@ -109,6 +109,11 @@
 
 static AioContext *qemu_aio_context;
 
+AioContext *qemu_get_aio_context(void)
+{
+    return qemu_aio_context;
+}
+
 void qemu_notify_event(void)
 {
     if (!qemu_aio_context) {
diff --git a/qemu-coroutine-lock.c b/qemu-coroutine-lock.c
index 97ef01c..86efe1f 100644
--- a/qemu-coroutine-lock.c
+++ b/qemu-coroutine-lock.c
@@ -29,28 +29,36 @@
 #include "block/aio.h"
 #include "trace.h"
 
-static QTAILQ_HEAD(, Coroutine) unlock_bh_queue =
-    QTAILQ_HEAD_INITIALIZER(unlock_bh_queue);
-static QEMUBH* unlock_bh;
+/* Coroutines are awoken from a BH to allow the current coroutine to complete
+ * its flow of execution.  The BH may run after the CoQueue has been destroyed,
+ * so keep BH data in a separate heap-allocated struct.
+ */
+typedef struct {
+    QEMUBH *bh;
+    QTAILQ_HEAD(, Coroutine) entries;
+} CoQueueNextData;
 
 static void qemu_co_queue_next_bh(void *opaque)
 {
+    CoQueueNextData *data = opaque;
     Coroutine *next;
 
     trace_qemu_co_queue_next_bh();
-    while ((next = QTAILQ_FIRST(&unlock_bh_queue))) {
-        QTAILQ_REMOVE(&unlock_bh_queue, next, co_queue_next);
+    while ((next = QTAILQ_FIRST(&data->entries))) {
+        QTAILQ_REMOVE(&data->entries, next, co_queue_next);
         qemu_coroutine_enter(next, NULL);
     }
+
+    qemu_bh_delete(data->bh);
+    g_slice_free(CoQueueNextData, data);
 }
 
 void qemu_co_queue_init(CoQueue *queue)
 {
     QTAILQ_INIT(&queue->entries);
 
-    if (!unlock_bh) {
-        unlock_bh = qemu_bh_new(qemu_co_queue_next_bh, NULL);
-    }
+    /* This will be exposed to callers once there are multiple AioContexts */
+    queue->ctx = qemu_get_aio_context();
 }
 
 void coroutine_fn qemu_co_queue_wait(CoQueue *queue)
@@ -69,26 +77,39 @@
     assert(qemu_in_coroutine());
 }
 
-bool qemu_co_queue_next(CoQueue *queue)
+static bool qemu_co_queue_do_restart(CoQueue *queue, bool single)
 {
     Coroutine *next;
+    CoQueueNextData *data;
 
-    next = QTAILQ_FIRST(&queue->entries);
-    if (next) {
-        QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
-        QTAILQ_INSERT_TAIL(&unlock_bh_queue, next, co_queue_next);
-        trace_qemu_co_queue_next(next);
-        qemu_bh_schedule(unlock_bh);
+    if (QTAILQ_EMPTY(&queue->entries)) {
+        return false;
     }
 
-    return (next != NULL);
+    data = g_slice_new(CoQueueNextData);
+    data->bh = aio_bh_new(queue->ctx, qemu_co_queue_next_bh, data);
+    QTAILQ_INIT(&data->entries);
+    qemu_bh_schedule(data->bh);
+
+    while ((next = QTAILQ_FIRST(&queue->entries)) != NULL) {
+        QTAILQ_REMOVE(&queue->entries, next, co_queue_next);
+        QTAILQ_INSERT_TAIL(&data->entries, next, co_queue_next);
+        trace_qemu_co_queue_next(next);
+        if (single) {
+            break;
+        }
+    }
+    return true;
+}
+
+bool qemu_co_queue_next(CoQueue *queue)
+{
+    return qemu_co_queue_do_restart(queue, true);
 }
 
 void qemu_co_queue_restart_all(CoQueue *queue)
 {
-    while (qemu_co_queue_next(queue)) {
-        /* Do nothing */
-    }
+    qemu_co_queue_do_restart(queue, false);
 }
 
 bool qemu_co_queue_empty(CoQueue *queue)
diff --git a/qemu-img.c b/qemu-img.c
index 471de7d..31627b0 100644
--- a/qemu-img.c
+++ b/qemu-img.c
@@ -276,7 +276,7 @@
         drv = NULL;
     }
 
-    ret = bdrv_open(bs, filename, flags, drv);
+    ret = bdrv_open(bs, filename, NULL, flags, drv);
     if (ret < 0) {
         error_report("Could not open '%s': %s", filename, strerror(-ret));
         goto fail;
@@ -2156,7 +2156,7 @@
 
         bs_old_backing = bdrv_new("old_backing");
         bdrv_get_backing_filename(bs, backing_name, sizeof(backing_name));
-        ret = bdrv_open(bs_old_backing, backing_name, BDRV_O_FLAGS,
+        ret = bdrv_open(bs_old_backing, backing_name, NULL, BDRV_O_FLAGS,
                         old_backing_drv);
         if (ret) {
             error_report("Could not open old backing file '%s'", backing_name);
@@ -2164,7 +2164,7 @@
         }
         if (out_baseimg[0]) {
             bs_new_backing = bdrv_new("new_backing");
-            ret = bdrv_open(bs_new_backing, out_baseimg, BDRV_O_FLAGS,
+            ret = bdrv_open(bs_new_backing, out_baseimg, NULL, BDRV_O_FLAGS,
                         new_backing_drv);
             if (ret) {
                 error_report("Could not open new backing file '%s'",
diff --git a/qemu-io.c b/qemu-io.c
index 7b3de42..79be516 100644
--- a/qemu-io.c
+++ b/qemu-io.c
@@ -1773,7 +1773,7 @@
     } else {
         bs = bdrv_new("hda");
 
-        if (bdrv_open(bs, name, flags, NULL) < 0) {
+        if (bdrv_open(bs, name, NULL, flags, NULL) < 0) {
             fprintf(stderr, "%s: can't open device %s\n", progname, name);
             bdrv_delete(bs);
             bs = NULL;
diff --git a/qemu-nbd.c b/qemu-nbd.c
index e7268d0..ca722ed 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -557,7 +557,7 @@
 
     bs = bdrv_new("hda");
     srcpath = argv[optind];
-    if ((ret = bdrv_open(bs, srcpath, flags, NULL)) < 0) {
+    if ((ret = bdrv_open(bs, srcpath, NULL, flags, NULL)) < 0) {
         errno = -ret;
         err(EXIT_FAILURE, "Failed to bdrv_open '%s'", argv[optind]);
     }
diff --git a/qobject/qdict.c b/qobject/qdict.c
index 7543ccc..ed381f9 100644
--- a/qobject/qdict.c
+++ b/qobject/qdict.c
@@ -401,6 +401,28 @@
 }
 
 /**
+ * qdict_clone_shallow(): Clones a given QDict. Its entries are not copied, but
+ * another reference is added.
+ */
+QDict *qdict_clone_shallow(const QDict *src)
+{
+    QDict *dest;
+    QDictEntry *entry;
+    int i;
+
+    dest = qdict_new();
+
+    for (i = 0; i < QDICT_BUCKET_MAX; i++) {
+        QLIST_FOREACH(entry, &src->table[i], next) {
+            qobject_incref(entry->value);
+            qdict_put_obj(dest, entry->key, entry->value);
+        }
+    }
+
+    return dest;
+}
+
+/**
  * qentry_destroy(): Free all the memory allocated by a QDictEntry
  */
 static void qentry_destroy(QDictEntry *e)
diff --git a/tests/qemu-iotests/007 b/tests/qemu-iotests/007
index 0139264..c454f2c 100755
--- a/tests/qemu-iotests/007
+++ b/tests/qemu-iotests/007
@@ -50,10 +50,9 @@
 
 for i in `seq 1 10`; do
     echo "savevm $i"
-    # XXX(hch): adding -nographic would be good, but hangs the test
-    $QEMU -hda $TEST_IMG -monitor stdio >/dev/null 2>&1 <<EOF     
-savevm test-$i                                                              
-quit                                                                        
+    $QEMU -nographic -hda $TEST_IMG -serial none -monitor stdio >/dev/null 2>&1 <<EOF
+savevm test-$i
+quit
 EOF
 done
 
diff --git a/tests/qemu-iotests/050 b/tests/qemu-iotests/050
new file mode 100755
index 0000000..05793e2
--- /dev/null
+++ b/tests/qemu-iotests/050
@@ -0,0 +1,75 @@
+#!/bin/bash
+#
+# Test qemu-img rebase with zero clusters
+#
+# Copyright (C) 2013 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+# creator
+owner=pbonzini@redhat.com
+
+seq=`basename $0`
+echo "QA output created by $seq"
+
+here=`pwd`
+tmp=/tmp/$$
+status=1	# failure is the default!
+
+_cleanup()
+{
+    _cleanup_test_img
+    rm -f $TEST_IMG.old
+    rm -f $TEST_IMG.new
+}
+trap "_cleanup; exit \$status" 0 1 2 3 15
+
+# get standard environment, filters and checks
+. ./common.rc
+. ./common.filter
+
+_supported_fmt qcow2 qed
+_supported_proto file
+_supported_os Linux
+
+if test "$IMGFMT" = qcow2 && test $IMGOPTS = ""; then
+  IMGOPTS=compat=1.1
+fi
+
+echo
+echo "== Creating images =="
+
+size=10M
+_make_test_img $size
+$QEMU_IO -c "write -P 0x40 0 1048576" $TEST_IMG | _filter_qemu_io
+mv $TEST_IMG $TEST_IMG.old
+
+_make_test_img $size
+$QEMU_IO -c "write -P 0x5a 0 1048576" $TEST_IMG | _filter_qemu_io
+mv $TEST_IMG $TEST_IMG.new
+
+_make_test_img -b $TEST_IMG.old $size
+$QEMU_IO -c "write -z 0 1048576" $TEST_IMG | _filter_qemu_io
+
+echo
+echo "== Rebasing the image =="
+
+$QEMU_IMG rebase -b $TEST_IMG.new $TEST_IMG
+$QEMU_IO -c "read -P 0x00 0 1048576" $TEST_IMG | _filter_qemu_io
+
+# success, all done
+echo "*** done"
+rm -f $seq.full
+status=0
diff --git a/tests/qemu-iotests/050.out b/tests/qemu-iotests/050.out
new file mode 100644
index 0000000..3f5f7e1
--- /dev/null
+++ b/tests/qemu-iotests/050.out
@@ -0,0 +1,17 @@
+QA output created by 050
+
+== Creating images ==
+Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760 
+wrote 1048576/1048576 bytes at offset 0
+1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760 
+wrote 1048576/1048576 bytes at offset 0
+1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+Formatting 'TEST_DIR/t.IMGFMT', fmt=IMGFMT size=10485760 backing_file='TEST_DIR/t.IMGFMT.old' 
+wrote 1048576/1048576 bytes at offset 0
+1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+
+== Rebasing the image ==
+read 1048576/1048576 bytes at offset 0
+1 MiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
+*** done
diff --git a/tests/qemu-iotests/group b/tests/qemu-iotests/group
index fcf57e0..1d7e4f3 100644
--- a/tests/qemu-iotests/group
+++ b/tests/qemu-iotests/group
@@ -56,3 +56,4 @@
 047 rw auto
 048 img auto quick
 049 rw auto
+050 rw auto backing quick
diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c
index 9998e03..22915aa 100644
--- a/tests/test-thread-pool.c
+++ b/tests/test-thread-pool.c
@@ -4,6 +4,8 @@
 #include "block/thread-pool.h"
 #include "block/block.h"
 
+static AioContext *ctx;
+static ThreadPool *pool;
 static int active;
 
 typedef struct {
@@ -38,19 +40,10 @@
     active--;
 }
 
-/* A non-blocking poll of the main AIO context (we cannot use aio_poll
- * because we do not know the AioContext).
- */
-static void qemu_aio_wait_nonblocking(void)
-{
-    qemu_notify_event();
-    qemu_aio_wait();
-}
-
 /* Wait until all aio and bh activity has finished */
 static void qemu_aio_wait_all(void)
 {
-    while (qemu_aio_wait()) {
+    while (aio_poll(ctx, true)) {
         /* Do nothing */
     }
 }
@@ -58,7 +51,7 @@
 static void test_submit(void)
 {
     WorkerTestData data = { .n = 0 };
-    thread_pool_submit(worker_cb, &data);
+    thread_pool_submit(pool, worker_cb, &data);
     qemu_aio_wait_all();
     g_assert_cmpint(data.n, ==, 1);
 }
@@ -66,7 +59,8 @@
 static void test_submit_aio(void)
 {
     WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
-    data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data);
+    data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
+                                        done_cb, &data);
 
     /* The callbacks are not called until after the first wait.  */
     active = 1;
@@ -84,7 +78,7 @@
     active = 1;
     data->n = 0;
     data->ret = -EINPROGRESS;
-    thread_pool_submit_co(worker_cb, data);
+    thread_pool_submit_co(pool, worker_cb, data);
 
     /* The test continues in test_submit_co, after qemu_coroutine_enter... */
 
@@ -126,12 +120,12 @@
     for (i = 0; i < 100; i++) {
         data[i].n = 0;
         data[i].ret = -EINPROGRESS;
-        thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
+        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
     }
 
     active = 100;
     while (active > 0) {
-        qemu_aio_wait();
+        aio_poll(ctx, true);
     }
     for (i = 0; i < 100; i++) {
         g_assert_cmpint(data[i].n, ==, 1);
@@ -154,7 +148,7 @@
     for (i = 0; i < 100; i++) {
         data[i].n = 0;
         data[i].ret = -EINPROGRESS;
-        data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
+        data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
                                                done_cb, &data[i]);
     }
 
@@ -162,7 +156,8 @@
      * run, but do not waste too much time...
      */
     active = 100;
-    qemu_aio_wait_nonblocking();
+    aio_notify(ctx);
+    aio_poll(ctx, false);
 
     /* Wait some time for the threads to start, with some sanity
      * testing on the behavior of the scheduler...
@@ -208,11 +203,10 @@
 
 int main(int argc, char **argv)
 {
-    /* These should be removed once each AioContext has its thread pool.
-     * The test should create its own AioContext.
-     */
-    qemu_init_main_loop();
-    bdrv_init();
+    int ret;
+
+    ctx = aio_context_new();
+    pool = aio_get_thread_pool(ctx);
 
     g_test_init(&argc, &argv, NULL);
     g_test_add_func("/thread-pool/submit", test_submit);
@@ -220,5 +214,9 @@
     g_test_add_func("/thread-pool/submit-co", test_submit_co);
     g_test_add_func("/thread-pool/submit-many", test_submit_many);
     g_test_add_func("/thread-pool/cancel", test_cancel);
-    return g_test_run();
+
+    ret = g_test_run();
+
+    aio_context_unref(ctx);
+    return ret;
 }
diff --git a/thread-pool.c b/thread-pool.c
index e3ca64d..0ebd4c2 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -24,7 +24,7 @@
 #include "qemu/event_notifier.h"
 #include "block/thread-pool.h"
 
-static void do_spawn_thread(void);
+static void do_spawn_thread(ThreadPool *pool);
 
 typedef struct ThreadPoolElement ThreadPoolElement;
 
@@ -37,6 +37,7 @@
 
 struct ThreadPoolElement {
     BlockDriverAIOCB common;
+    ThreadPool *pool;
     ThreadPoolFunc *func;
     void *arg;
 
@@ -54,49 +55,56 @@
     QLIST_ENTRY(ThreadPoolElement) all;
 };
 
-static EventNotifier notifier;
-static QemuMutex lock;
-static QemuCond check_cancel;
-static QemuSemaphore sem;
-static int max_threads = 64;
-static QEMUBH *new_thread_bh;
+struct ThreadPool {
+    EventNotifier notifier;
+    AioContext *ctx;
+    QemuMutex lock;
+    QemuCond check_cancel;
+    QemuCond worker_stopped;
+    QemuSemaphore sem;
+    int max_threads;
+    QEMUBH *new_thread_bh;
 
-/* The following variables are protected by the global mutex.  */
-static QLIST_HEAD(, ThreadPoolElement) head;
+    /* The following variables are only accessed from one AioContext. */
+    QLIST_HEAD(, ThreadPoolElement) head;
 
-/* The following variables are protected by lock.  */
-static QTAILQ_HEAD(, ThreadPoolElement) request_list;
-static int cur_threads;
-static int idle_threads;
-static int new_threads;     /* backlog of threads we need to create */
-static int pending_threads; /* threads created but not running yet */
-static int pending_cancellations; /* whether we need a cond_broadcast */
+    /* The following variables are protected by lock.  */
+    QTAILQ_HEAD(, ThreadPoolElement) request_list;
+    int cur_threads;
+    int idle_threads;
+    int new_threads;     /* backlog of threads we need to create */
+    int pending_threads; /* threads created but not running yet */
+    int pending_cancellations; /* whether we need a cond_broadcast */
+    bool stopping;
+};
 
-static void *worker_thread(void *unused)
+static void *worker_thread(void *opaque)
 {
-    qemu_mutex_lock(&lock);
-    pending_threads--;
-    do_spawn_thread();
+    ThreadPool *pool = opaque;
 
-    while (1) {
+    qemu_mutex_lock(&pool->lock);
+    pool->pending_threads--;
+    do_spawn_thread(pool);
+
+    while (!pool->stopping) {
         ThreadPoolElement *req;
         int ret;
 
         do {
-            idle_threads++;
-            qemu_mutex_unlock(&lock);
-            ret = qemu_sem_timedwait(&sem, 10000);
-            qemu_mutex_lock(&lock);
-            idle_threads--;
-        } while (ret == -1 && !QTAILQ_EMPTY(&request_list));
-        if (ret == -1) {
+            pool->idle_threads++;
+            qemu_mutex_unlock(&pool->lock);
+            ret = qemu_sem_timedwait(&pool->sem, 10000);
+            qemu_mutex_lock(&pool->lock);
+            pool->idle_threads--;
+        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
+        if (ret == -1 || pool->stopping) {
             break;
         }
 
-        req = QTAILQ_FIRST(&request_list);
-        QTAILQ_REMOVE(&request_list, req, reqs);
+        req = QTAILQ_FIRST(&pool->request_list);
+        QTAILQ_REMOVE(&pool->request_list, req, reqs);
         req->state = THREAD_ACTIVE;
-        qemu_mutex_unlock(&lock);
+        qemu_mutex_unlock(&pool->lock);
 
         ret = req->func(req->arg);
 
@@ -105,45 +113,48 @@
         smp_wmb();
         req->state = THREAD_DONE;
 
-        qemu_mutex_lock(&lock);
-        if (pending_cancellations) {
-            qemu_cond_broadcast(&check_cancel);
+        qemu_mutex_lock(&pool->lock);
+        if (pool->pending_cancellations) {
+            qemu_cond_broadcast(&pool->check_cancel);
         }
 
-        event_notifier_set(&notifier);
+        event_notifier_set(&pool->notifier);
     }
 
-    cur_threads--;
-    qemu_mutex_unlock(&lock);
+    pool->cur_threads--;
+    qemu_cond_signal(&pool->worker_stopped);
+    qemu_mutex_unlock(&pool->lock);
     return NULL;
 }
 
-static void do_spawn_thread(void)
+static void do_spawn_thread(ThreadPool *pool)
 {
     QemuThread t;
 
     /* Runs with lock taken.  */
-    if (!new_threads) {
+    if (!pool->new_threads) {
         return;
     }
 
-    new_threads--;
-    pending_threads++;
+    pool->new_threads--;
+    pool->pending_threads++;
 
-    qemu_thread_create(&t, worker_thread, NULL, QEMU_THREAD_DETACHED);
+    qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
 }
 
 static void spawn_thread_bh_fn(void *opaque)
 {
-    qemu_mutex_lock(&lock);
-    do_spawn_thread();
-    qemu_mutex_unlock(&lock);
+    ThreadPool *pool = opaque;
+
+    qemu_mutex_lock(&pool->lock);
+    do_spawn_thread(pool);
+    qemu_mutex_unlock(&pool->lock);
 }
 
-static void spawn_thread(void)
+static void spawn_thread(ThreadPool *pool)
 {
-    cur_threads++;
-    new_threads++;
+    pool->cur_threads++;
+    pool->new_threads++;
     /* If there are threads being created, they will spawn new workers, so
      * we don't spend time creating many threads in a loop holding a mutex or
      * starving the current vcpu.
@@ -151,23 +162,25 @@
      * If there are no idle threads, ask the main thread to create one, so we
      * inherit the correct affinity instead of the vcpu affinity.
      */
-    if (!pending_threads) {
-        qemu_bh_schedule(new_thread_bh);
+    if (!pool->pending_threads) {
+        qemu_bh_schedule(pool->new_thread_bh);
     }
 }
 
 static void event_notifier_ready(EventNotifier *notifier)
 {
+    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
     ThreadPoolElement *elem, *next;
 
     event_notifier_test_and_clear(notifier);
 restart:
-    QLIST_FOREACH_SAFE(elem, &head, all, next) {
+    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
         if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
             continue;
         }
         if (elem->state == THREAD_DONE) {
-            trace_thread_pool_complete(elem, elem->common.opaque, elem->ret);
+            trace_thread_pool_complete(pool, elem, elem->common.opaque,
+                                       elem->ret);
         }
         if (elem->state == THREAD_DONE && elem->common.cb) {
             QLIST_REMOVE(elem, all);
@@ -186,34 +199,36 @@
 
 static int thread_pool_active(EventNotifier *notifier)
 {
-    return !QLIST_EMPTY(&head);
+    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
+    return !QLIST_EMPTY(&pool->head);
 }
 
 static void thread_pool_cancel(BlockDriverAIOCB *acb)
 {
     ThreadPoolElement *elem = (ThreadPoolElement *)acb;
+    ThreadPool *pool = elem->pool;
 
     trace_thread_pool_cancel(elem, elem->common.opaque);
 
-    qemu_mutex_lock(&lock);
+    qemu_mutex_lock(&pool->lock);
     if (elem->state == THREAD_QUEUED &&
         /* No thread has yet started working on elem. we can try to "steal"
          * the item from the worker if we can get a signal from the
          * semaphore.  Because this is non-blocking, we can do it with
          * the lock taken and ensure that elem will remain THREAD_QUEUED.
          */
-        qemu_sem_timedwait(&sem, 0) == 0) {
-        QTAILQ_REMOVE(&request_list, elem, reqs);
+        qemu_sem_timedwait(&pool->sem, 0) == 0) {
+        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
         elem->state = THREAD_CANCELED;
-        event_notifier_set(&notifier);
+        event_notifier_set(&pool->notifier);
     } else {
-        pending_cancellations++;
+        pool->pending_cancellations++;
         while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
-            qemu_cond_wait(&check_cancel, &lock);
+            qemu_cond_wait(&pool->check_cancel, &pool->lock);
         }
-        pending_cancellations--;
+        pool->pending_cancellations--;
     }
-    qemu_mutex_unlock(&lock);
+    qemu_mutex_unlock(&pool->lock);
 }
 
 static const AIOCBInfo thread_pool_aiocb_info = {
@@ -221,7 +236,8 @@
     .cancel             = thread_pool_cancel,
 };
 
-BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
+BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
+        ThreadPoolFunc *func, void *arg,
         BlockDriverCompletionFunc *cb, void *opaque)
 {
     ThreadPoolElement *req;
@@ -230,18 +246,19 @@
     req->func = func;
     req->arg = arg;
     req->state = THREAD_QUEUED;
+    req->pool = pool;
 
-    QLIST_INSERT_HEAD(&head, req, all);
+    QLIST_INSERT_HEAD(&pool->head, req, all);
 
-    trace_thread_pool_submit(req, arg);
+    trace_thread_pool_submit(pool, req, arg);
 
-    qemu_mutex_lock(&lock);
-    if (idle_threads == 0 && cur_threads < max_threads) {
-        spawn_thread();
+    qemu_mutex_lock(&pool->lock);
+    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
+        spawn_thread(pool);
     }
-    QTAILQ_INSERT_TAIL(&request_list, req, reqs);
-    qemu_mutex_unlock(&lock);
-    qemu_sem_post(&sem);
+    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
+    qemu_mutex_unlock(&pool->lock);
+    qemu_sem_post(&pool->sem);
     return &req->common;
 }
 
@@ -258,32 +275,80 @@
     qemu_coroutine_enter(co->co, NULL);
 }
 
-int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
+int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
+                                       void *arg)
 {
     ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
     assert(qemu_in_coroutine());
-    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
+    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
     qemu_coroutine_yield();
     return tpc.ret;
 }
 
-void thread_pool_submit(ThreadPoolFunc *func, void *arg)
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
 {
-    thread_pool_submit_aio(func, arg, NULL, NULL);
+    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
 }
 
-static void thread_pool_init(void)
+static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
 {
-    QLIST_INIT(&head);
-    event_notifier_init(&notifier, false);
-    qemu_mutex_init(&lock);
-    qemu_cond_init(&check_cancel);
-    qemu_sem_init(&sem, 0);
-    qemu_aio_set_event_notifier(&notifier, event_notifier_ready,
-                                thread_pool_active);
+    if (!ctx) {
+        ctx = qemu_get_aio_context();
+    }
 
-    QTAILQ_INIT(&request_list);
-    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
+    memset(pool, 0, sizeof(*pool));
+    event_notifier_init(&pool->notifier, false);
+    pool->ctx = ctx;
+    qemu_mutex_init(&pool->lock);
+    qemu_cond_init(&pool->check_cancel);
+    qemu_cond_init(&pool->worker_stopped);
+    qemu_sem_init(&pool->sem, 0);
+    pool->max_threads = 64;
+    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
+
+    QLIST_INIT(&pool->head);
+    QTAILQ_INIT(&pool->request_list);
+
+    aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
+                           thread_pool_active);
 }
 
-block_init(thread_pool_init)
+ThreadPool *thread_pool_new(AioContext *ctx)
+{
+    ThreadPool *pool = g_new(ThreadPool, 1);
+    thread_pool_init_one(pool, ctx);
+    return pool;
+}
+
+void thread_pool_free(ThreadPool *pool)
+{
+    if (!pool) {
+        return;
+    }
+
+    assert(QLIST_EMPTY(&pool->head));
+
+    qemu_mutex_lock(&pool->lock);
+
+    /* Stop new threads from spawning */
+    qemu_bh_delete(pool->new_thread_bh);
+    pool->cur_threads -= pool->new_threads;
+    pool->new_threads = 0;
+
+    /* Wait for worker threads to terminate */
+    pool->stopping = true;
+    while (pool->cur_threads > 0) {
+        qemu_sem_post(&pool->sem);
+        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
+    }
+
+    qemu_mutex_unlock(&pool->lock);
+
+    aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
+    qemu_sem_destroy(&pool->sem);
+    qemu_cond_destroy(&pool->check_cancel);
+    qemu_cond_destroy(&pool->worker_stopped);
+    qemu_mutex_destroy(&pool->lock);
+    event_notifier_cleanup(&pool->notifier);
+    g_free(pool);
+}
diff --git a/trace-events b/trace-events
index d6a847d..cd73b7f 100644
--- a/trace-events
+++ b/trace-events
@@ -115,8 +115,8 @@
 vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p"
 
 # thread-pool.c
-thread_pool_submit(void *req, void *opaque) "req %p opaque %p"
-thread_pool_complete(void *req, void *opaque, int ret) "req %p opaque %p ret %d"
+thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
+thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"
 thread_pool_cancel(void *req, void *opaque) "req %p opaque %p"
 
 # posix-aio-compat.c
diff --git a/util/qemu-option.c b/util/qemu-option.c
index 5a1d03c..8b74bf1 100644
--- a/util/qemu-option.c
+++ b/util/qemu-option.c
@@ -1067,6 +1067,40 @@
 }
 
 /*
+ * Adds all QDict entries to the QemuOpts that can be added and removes them
+ * from the QDict. When this function returns, the QDict contains only those
+ * entries that couldn't be added to the QemuOpts.
+ */
+void qemu_opts_absorb_qdict(QemuOpts *opts, QDict *qdict, Error **errp)
+{
+    const QDictEntry *entry, *next;
+
+    entry = qdict_first(qdict);
+
+    while (entry != NULL) {
+        Error *local_err = NULL;
+        OptsFromQDictState state = {
+            .errp = &local_err,
+            .opts = opts,
+        };
+
+        next = qdict_next(qdict, entry);
+
+        if (find_desc_by_name(opts->list->desc, entry->key)) {
+            qemu_opts_from_qdict_1(entry->key, entry->value, &state);
+            if (error_is_set(&local_err)) {
+                error_propagate(errp, local_err);
+                return;
+            } else {
+                qdict_del(qdict, entry->key);
+            }
+        }
+
+        entry = next;
+    }
+}
+
+/*
  * Convert from QemuOpts to QDict.
  * The QDict values are of type QString.
  * TODO We'll want to use types appropriate for opt->desc->type, but