| #!/usr/bin/env python | 
 | # | 
 | # Tests for internal snapshot. | 
 | # | 
 | # Copyright (C) 2013 IBM, Inc. | 
 | # | 
 | # Based on 055. | 
 | # | 
 | # This program is free software; you can redistribute it and/or modify | 
 | # it under the terms of the GNU General Public License as published by | 
 | # the Free Software Foundation; either version 2 of the License, or | 
 | # (at your option) any later version. | 
 | # | 
 | # This program is distributed in the hope that it will be useful, | 
 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | 
 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
 | # GNU General Public License for more details. | 
 | # | 
 | # You should have received a copy of the GNU General Public License | 
 | # along with this program.  If not, see <http://www.gnu.org/licenses/>. | 
 | # | 
 |  | 
 | import time | 
 | import os | 
 | import iotests | 
 | from iotests import qemu_img, qemu_io | 
 |  | 
 | test_drv_base_name = 'drive' | 
 |  | 
 | class ImageSnapshotTestCase(iotests.QMPTestCase): | 
 |     image_len = 120 * 1024 * 1024 # MB | 
 |  | 
 |     def __init__(self, *args): | 
 |         self.expect = [] | 
 |         super(ImageSnapshotTestCase, self).__init__(*args) | 
 |  | 
 |     def _setUp(self, test_img_base_name, image_num): | 
 |         self.vm = iotests.VM() | 
 |         for i in range(0, image_num): | 
 |             filename = '%s%d' % (test_img_base_name, i) | 
 |             img = os.path.join(iotests.test_dir, filename) | 
 |             device = '%s%d' % (test_drv_base_name, i) | 
 |             qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len)) | 
 |             self.vm.add_drive(img) | 
 |             self.expect.append({'image': img, 'device': device, | 
 |                                 'snapshots': [], | 
 |                                 'snapshots_name_counter': 0}) | 
 |         self.vm.launch() | 
 |  | 
 |     def tearDown(self): | 
 |         self.vm.shutdown() | 
 |         for dev_expect in self.expect: | 
 |             os.remove(dev_expect['image']) | 
 |  | 
 |     def createSnapshotInTransaction(self, snapshot_num, abort = False): | 
 |         actions = [] | 
 |         for dev_expect in self.expect: | 
 |             num = dev_expect['snapshots_name_counter'] | 
 |             for j in range(0, snapshot_num): | 
 |                 name = '%s_sn%d' % (dev_expect['device'], num) | 
 |                 num = num + 1 | 
 |                 if abort == False: | 
 |                     dev_expect['snapshots'].append({'name': name}) | 
 |                     dev_expect['snapshots_name_counter'] = num | 
 |                 actions.append({ | 
 |                     'type': 'blockdev-snapshot-internal-sync', | 
 |                     'data': { 'device': dev_expect['device'], | 
 |                               'name': name }, | 
 |                 }) | 
 |  | 
 |         if abort == True: | 
 |             actions.append({ | 
 |                 'type': 'abort', | 
 |                 'data': {}, | 
 |             }) | 
 |  | 
 |         result = self.vm.qmp('transaction', actions = actions) | 
 |  | 
 |         if abort == True: | 
 |             self.assert_qmp(result, 'error/class', 'GenericError') | 
 |         else: | 
 |             self.assert_qmp(result, 'return', {}) | 
 |  | 
 |     def verifySnapshotInfo(self): | 
 |         result = self.vm.qmp('query-block') | 
 |  | 
 |         # Verify each expected result | 
 |         for dev_expect in self.expect: | 
 |             # 1. Find the returned image value and snapshot info | 
 |             image_result = None | 
 |             for device in result['return']: | 
 |                 if device['device'] == dev_expect['device']: | 
 |                     image_result = device['inserted']['image'] | 
 |                     break | 
 |             self.assertTrue(image_result != None) | 
 |             # Do not consider zero snapshot case now | 
 |             sn_list_result = image_result['snapshots'] | 
 |             sn_list_expect = dev_expect['snapshots'] | 
 |  | 
 |             # 2. Verify it with expect | 
 |             self.assertTrue(len(sn_list_result) == len(sn_list_expect)) | 
 |  | 
 |             for sn_expect in sn_list_expect: | 
 |                 sn_result = None | 
 |                 for sn in sn_list_result: | 
 |                     if sn_expect['name'] == sn['name']: | 
 |                         sn_result = sn | 
 |                         break | 
 |                 self.assertTrue(sn_result != None) | 
 |                 # Fill in the detail info | 
 |                 sn_expect.update(sn_result) | 
 |  | 
 |     def deleteSnapshot(self, device, id = None, name = None): | 
 |         sn_list_expect = None | 
 |         sn_expect = None | 
 |  | 
 |         self.assertTrue(id != None or name != None) | 
 |  | 
 |         # Fill in the detail info include ID | 
 |         self.verifySnapshotInfo() | 
 |  | 
 |         #find the expected snapshot list | 
 |         for dev_expect in self.expect: | 
 |             if dev_expect['device'] == device: | 
 |                 sn_list_expect = dev_expect['snapshots'] | 
 |                 break | 
 |         self.assertTrue(sn_list_expect != None) | 
 |  | 
 |         if id != None and name != None: | 
 |             for sn in sn_list_expect: | 
 |                 if sn['id'] == id and sn['name'] == name: | 
 |                     sn_expect = sn | 
 |                     result = \ | 
 |                           self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
 |                                       device = device, | 
 |                                       id = id, | 
 |                                       name = name) | 
 |                     break | 
 |         elif id != None: | 
 |             for sn in sn_list_expect: | 
 |                 if sn['id'] == id: | 
 |                     sn_expect = sn | 
 |                     result = \ | 
 |                           self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
 |                                       device = device, | 
 |                                       id = id) | 
 |                     break | 
 |         else: | 
 |             for sn in sn_list_expect: | 
 |                 if sn['name'] == name: | 
 |                     sn_expect = sn | 
 |                     result = \ | 
 |                           self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
 |                                       device = device, | 
 |                                       name = name) | 
 |                     break | 
 |  | 
 |         self.assertTrue(sn_expect != None) | 
 |  | 
 |         self.assert_qmp(result, 'return', sn_expect) | 
 |         sn_list_expect.remove(sn_expect) | 
 |  | 
 | class TestSingleTransaction(ImageSnapshotTestCase): | 
 |     def setUp(self): | 
 |         self._setUp('test_a.img', 1) | 
 |  | 
 |     def test_create(self): | 
 |         self.createSnapshotInTransaction(1) | 
 |         self.verifySnapshotInfo() | 
 |  | 
 |     def test_error_name_empty(self): | 
 |         actions = [{'type': 'blockdev-snapshot-internal-sync', | 
 |                     'data': { 'device': self.expect[0]['device'], | 
 |                               'name': '' }, | 
 |                   }] | 
 |         result = self.vm.qmp('transaction', actions = actions) | 
 |         self.assert_qmp(result, 'error/class', 'GenericError') | 
 |  | 
 |     def test_error_device(self): | 
 |         actions = [{'type': 'blockdev-snapshot-internal-sync', | 
 |                     'data': { 'device': 'drive_error', | 
 |                               'name': 'a' }, | 
 |                   }] | 
 |         result = self.vm.qmp('transaction', actions = actions) | 
 |         self.assert_qmp(result, 'error/class', 'DeviceNotFound') | 
 |  | 
 |     def test_error_exist(self): | 
 |         self.createSnapshotInTransaction(1) | 
 |         self.verifySnapshotInfo() | 
 |         actions = [{'type': 'blockdev-snapshot-internal-sync', | 
 |                     'data': { 'device': self.expect[0]['device'], | 
 |                               'name': self.expect[0]['snapshots'][0] }, | 
 |                   }] | 
 |         result = self.vm.qmp('transaction', actions = actions) | 
 |         self.assert_qmp(result, 'error/class', 'GenericError') | 
 |  | 
 | class TestMultipleTransaction(ImageSnapshotTestCase): | 
 |     def setUp(self): | 
 |         self._setUp('test_b.img', 2) | 
 |  | 
 |     def test_create(self): | 
 |         self.createSnapshotInTransaction(3) | 
 |         self.verifySnapshotInfo() | 
 |  | 
 |     def test_abort(self): | 
 |         self.createSnapshotInTransaction(2) | 
 |         self.verifySnapshotInfo() | 
 |         self.createSnapshotInTransaction(3, abort = True) | 
 |         self.verifySnapshotInfo() | 
 |  | 
 | class TestSnapshotDelete(ImageSnapshotTestCase): | 
 |     def setUp(self): | 
 |         self._setUp('test_c.img', 1) | 
 |  | 
 |     def test_delete_with_id(self): | 
 |         self.createSnapshotInTransaction(2) | 
 |         self.verifySnapshotInfo() | 
 |         self.deleteSnapshot(self.expect[0]['device'], | 
 |                             id = self.expect[0]['snapshots'][0]['id']) | 
 |         self.verifySnapshotInfo() | 
 |  | 
 |     def test_delete_with_name(self): | 
 |         self.createSnapshotInTransaction(3) | 
 |         self.verifySnapshotInfo() | 
 |         self.deleteSnapshot(self.expect[0]['device'], | 
 |                             name = self.expect[0]['snapshots'][1]['name']) | 
 |         self.verifySnapshotInfo() | 
 |  | 
 |     def test_delete_with_id_and_name(self): | 
 |         self.createSnapshotInTransaction(4) | 
 |         self.verifySnapshotInfo() | 
 |         self.deleteSnapshot(self.expect[0]['device'], | 
 |                             id = self.expect[0]['snapshots'][2]['id'], | 
 |                             name = self.expect[0]['snapshots'][2]['name']) | 
 |         self.verifySnapshotInfo() | 
 |  | 
 |  | 
 |     def test_error_device(self): | 
 |         result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
 |                               device = 'drive_error', | 
 |                               id = '0') | 
 |         self.assert_qmp(result, 'error/class', 'DeviceNotFound') | 
 |  | 
 |     def test_error_no_id_and_name(self): | 
 |         result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
 |                               device = self.expect[0]['device']) | 
 |         self.assert_qmp(result, 'error/class', 'GenericError') | 
 |  | 
 |     def test_error_snapshot_not_exist(self): | 
 |         self.createSnapshotInTransaction(2) | 
 |         self.verifySnapshotInfo() | 
 |         result = self.vm.qmp('blockdev-snapshot-delete-internal-sync', | 
 |                               device = self.expect[0]['device'], | 
 |                               id = self.expect[0]['snapshots'][0]['id'], | 
 |                               name = self.expect[0]['snapshots'][1]['name']) | 
 |         self.assert_qmp(result, 'error/class', 'GenericError') | 
 |  | 
 | if __name__ == '__main__': | 
 |     iotests.main(supported_fmts=['qcow2']) |