Trove源码分析

Trove代码版本:Kilo

Trove数据库代码

Trove相关数据库表格的代码在:trove/db/sqlalchemy/migrate_repo/versions/目录下:

1
2
3
4
5
001_base_schema.py
002_service_images.py
...
032_clusters.py
033_datastore_parameters.py

创建表格函数:create_tables(...)

Trove Create

trove/instance/service.py

1
2
3
4
5
6
7
8
9
class InstanceController(wsgi.Controller):
def create(self, req, body, tenant_id):
instance = models.Instance.create(context, name, flavor_id,
image_id, databases, users,
datastore, datastore_version,
volume_size, backup_id,
availability_zone, nics,
configuration, slave_of_id,
replica_count=replica_count)

trove/instance/models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class Instance(BuiltInstance):
def create(cls, context, name, flavor_id, image_id, databases, users,
datastore, datastore_version, volume_size, backup_id,
availability_zone=None, nics=None, configuration_id=None,
slave_of_id=None, cluster_config=None, replica_count=None):
client = create_nova_client(context)
...
def _create_resources():
for instance_index in range(0, instance_count):
db_info = DBInstance.create(name=name, flavor_id=flavor_id,
tenant_id=context.tenant,
volume_size=volume_size,
datastore_version_id=
datastore_version.id,
task_status=InstanceTasks.BUILDING,
configuration_id=configuration_id,
slave_of_id=slave_of_id,
cluster_id=cluster_id,
shard_id=shard_id,
type=instance_type)
...
task_api.API(context).create_instance(
instance_id, instance_name, flavor, image_id, databases, users,
datastore_version.manager, datastore_version.packages,
volume_size, backup_id, availability_zone, root_password,
nics, overrides, slave_of_id, cluster_config)

return SimpleInstance(context, db_info, service_status,
root_password)

trove/taskmanager/api.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class API(object):
def create_instance(self, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id=None,
availability_zone=None, root_password=None,
nics=None, overrides=None, slave_of_id=None,
cluster_config=None):
cctxt = self.client.prepare(version=self.version_cap)
cctxt.cast(self.context, "create_instance",
instance_id=instance_id, name=name,
flavor=self._transform_obj(flavor),
image_id=image_id,
databases=databases,
users=users,
datastore_manager=datastore_manager,
packages=packages,
volume_size=volume_size,
backup_id=backup_id,
availability_zone=availability_zone,
root_password=root_password,
nics=nics,
overrides=overrides,
slave_of_id=slave_of_id,
cluster_config=cluster_config)

trove/taskmanager/manager.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Manager(periodic_task.PeriodicTasks):
def create_instance(self, context, instance_id, name, flavor,
image_id, databases, users, datastore_manager,
packages, volume_size, backup_id, availability_zone,
root_password, nics, overrides, slave_of_id,
cluster_config):
if slave_of_id:
self._create_replication_slave(context, instance_id, name,
flavor, image_id, databases, users,
datastore_manager, packages,
volume_size,
availability_zone, root_password,
nics, overrides, slave_of_id,
backup_id)
else:
if type(instance_id) in [list]:
raise AttributeError(_(
"Cannot create multiple non-replica instances."))
instance_tasks = FreshInstanceTasks.load(context, instance_id)
instance_tasks.create_instance(flavor, image_id, databases, users,
datastore_manager, packages,
volume_size, backup_id,
availability_zone, root_password,
nics, overrides, cluster_config)
timeout = (CONF.restore_usage_timeout if backup_id
else CONF.usage_timeout)
instance_tasks.wait_for_instance(timeout, flavor)

trove/taskmanager/models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class FreshInstanceTasks(FreshInstance, NotifyMixin, ConfigurationMixin):
def create_instance(self, flavor, image_id, databases, users,
datastore_manager, packages, volume_size,
backup_id, availability_zone, root_password, nics,
overrides, cluster_config, snapshot=None):
...
if use_heat:
volume_info = self._create_server_volume_heat(
flavor,
image_id,
datastore_manager,
volume_size,
availability_zone,
nics,
files)
elif use_nova_server_volume:
volume_info = self._create_server_volume(
flavor['id'],
image_id,
security_groups,
datastore_manager,
volume_size,
availability_zone,
nics,
files)
else:
volume_info = self._create_server_volume_individually(
flavor['id'],
image_id,
security_groups,
datastore_manager,
volume_size,
availability_zone,
nics,
files)

self._guest_prepare(flavor['ram'], volume_info,
packages, databases, users, backup_info,
config.config_contents, root_password,
config_overrides.config_contents,
cluster_config, snapshot)

def _create_server_volume_individually(self, flavor_id, image_id,
security_groups, datastore_manager,
volume_size, availability_zone,
nics, files):
volume_info = self._build_volume_info(datastore_manager,
volume_size=volume_size)
block_device_mapping = volume_info['block_device']
try:
server = self._create_server(flavor_id, image_id, security_groups,
datastore_manager,
block_device_mapping,
availability_zone, nics, files)
server_id = server.id
# Save server ID.
self.update_db(compute_instance_id=server_id)
...
return volume_info

def _build_volume_info(self, datastore_manager, volume_size=None):
if volume_support:
try:
volume_info = self._create_volume(
volume_size, datastore_manager)
...
return volume_info

def _create_server(self, flavor_id, image_id, security_groups,
datastore_manager, block_device_mapping,
availability_zone, nics, files={}):
server = self.nova_client.servers.create(
name, image_id, flavor_id, files=files, userdata=userdata,
security_groups=security_groups, block_device_mapping=bdmap,
availability_zone=availability_zone, nics=nics,
config_drive=config_drive)
return server

def _guest_prepare(self, flavor_ram, volume_info,
packages, databases, users, backup_info=None,
config_contents=None, root_password=None,
overrides=None, cluster_config=None, snapshot=None):
LOG.debug("Entering guest_prepare")
# Now wait for the response from the create to do additional work
self.guest.prepare(flavor_ram, packages, databases, users,
device_path=volume_info['device_path'],
mount_point=volume_info['mount_point'],
backup_info=backup_info,
config_contents=config_contents,
root_password=root_password,
overrides=overrides,
cluster_config=cluster_config,
snapshot=snapshot)

Trove Delete

trove/instance/service.py

1
2
3
class InstanceController(wsgi.Controller):
def delete(self, req, tenant_id, id):
instance.delete()

trove/instance/models.py

1
2
3
4
class BaseInstance(SimpleInstance):
def delete(self):
def _delete_resources():
task_api.API(self.context).delete_instance(self.id)

trove/taskmanager/api.py

1
2
3
4
class API(object):
def delete_instance(self, instance_id):
cctxt = self.client.prepare(version=self.version_cap)
cctxt.cast(self.context, "delete_instance", instance_id=instance_id)

trove/taskmanager/manager.py

1
2
3
4
5
6
7
8
9
10
class Manager(periodic_task.PeriodicTasks):
def delete_instance(self, context, instance_id):
try:
instance_tasks = models.BuiltInstanceTasks.load(context,
instance_id)
instance_tasks.delete_async()
except exception.UnprocessableEntity:
instance_tasks = models.FreshInstanceTasks.load(context,
instance_id)
instance_tasks.delete_async()

trove/instance/models.py

1
2
3
4
5
6
7
8
9
10
11
12
class BaseInstance(SimpleInstance):
def delete_async(self):
deleted_at = datetime.utcnow()
self._delete_resources(deleted_at)
LOG.debug("Setting instance %s to be deleted.", self.id)
self.update_db(deleted=True, deleted_at=deleted_at,
task_status=InstanceTasks.NONE)
self.set_servicestatus_deleted()
# Delete associated security group
if CONF.trove_security_groups_support:
SecurityGroup.delete_for_instance(self.db_info.id,
self.context)

trove/taskmanager/models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
def _delete_resources(self, deleted_at):
server_id = self.db_info.compute_instance_id
old_server = self.nova_client.servers.get(server_id
try:
self.guest.stop_db()
...
try:
if use_heat:
# Delete the server via heat
heatclient = create_heat_client(self.context)
name = 'trove-%s' % self.id
heatclient.stacks.delete(name)
else:
self.server.delete()
...
# If volume has been resized it must be manually removed in cinder
try:
if self.volume_id:
volume_client = create_cinder_client(self.context)
volume = volume_client.volumes.get(self.volume_id)
if volume.status == "available":
LOG.info(_("Deleting volume %(v)s for instance: %(i)s.")
% {'v': self.volume_id, 'i': self.id})
volume.delete()

Trove resize volume

trove/instance/service.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class InstanceController(wsgi.Controller):
def action(self, req, body, tenant_id, id):
_actions = {
'restart': self._action_restart,
'resize': self._action_resize,
'reset_password': self._action_reset_password,
'promote_to_replica_source':
self._action_promote_to_replica_source,
'eject_replica_source': self._action_eject_replica_source,
}

def _action_resize(self, instance, body):
options = {
'volume': self._action_resize_volume,
'flavorRef': self._action_resize_flavor
}

def _action_resize_volume(self, instance, volume):
instance.resize_volume(volume['size'])
return wsgi.Result(None, 202)

trove/instance/models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Instance(BuiltInstance):
def resize_volume(self, new_size):
def _resize_resources():
self.validate_can_perform_action()
...
self.update_db(task_status=InstanceTasks.RESIZING)
task_api.API(self.context).resize_volume(new_size, self.id)

if not self.volume_size:
raise exception.BadRequest(_("Instance %s has no volume.")
% self.id)
new_size_l = long(new_size)
validate_volume_size(new_size_l)
return run_with_quotas(self.tenant_id,
{'volumes': new_size_l - self.volume_size},
_resize_resources)

trove/taskmanager/api.py

1
2
3
4
5
6
class API(object):
def resize_volume(self, new_size, instance_id):
cctxt = self.client.prepare(version=self.version_cap)
cctxt.cast(self.context, "resize_volume",
new_size=new_size,
instance_id=instance_id)

trove/taskmanager/manager.py

1
2
3
4
class Manager(periodic_task.PeriodicTasks):
def resize_volume(self, context, instance_id, new_size):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
instance_tasks.resize_volume(new_size)

trove/taskmanager/models.py

1
2
3
4
class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
def resize_volume(self, new_size):
action = ResizeVolumeAction(self, self.volume_size, new_size)
action.execute()

trove/taskmanager/models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class ResizeVolumeAction(object):
def execute(self):
if self.instance.server.status == InstanceStatus.ACTIVE:
self._resize_active_volume()
self.instance.reset_task_status()
# send usage event for size reported by cinder
volume = self.instance.volume_client.volumes.get(
self.instance.volume_id)
launched_time = timeutils.isotime(self.instance.updated)
modified_time = timeutils.isotime(self.instance.updated)
self.instance.send_usage_event('modify_volume',
old_volume_size=self.old_size,
launched_at=launched_time,
modify_at=modified_time,
volume_size=volume.size)

def _resize_active_volume(self):
LOG.debug("Begin _resize_active_volume for id: %(id)s" % {
'id': self.instance.id})
self._stop_db()
self._unmount_volume(recover_func=self._recover_restart)
self._detach_volume(recover_func=self._recover_mount_restart)
self._extend(recover_func=self._recover_full)
self._verify_extend()
# if anything fails after this point, recovery is futile
self._attach_volume(recover_func=self._fail)
self._resize_fs(recover_func=self._fail)
self._mount_volume(recover_func=self._fail)
self.instance.restart()
LOG.debug("End _resize_active_volume for id: %(id)s" % {
'id': self.instance.id})

Trove resize flavor

trove/instance/service.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class InstanceController(wsgi.Controller):
def action(self, req, body, tenant_id, id):
_actions = {
'restart': self._action_restart,
'resize': self._action_resize,
'reset_password': self._action_reset_password,
'promote_to_replica_source':
self._action_promote_to_replica_source,
'eject_replica_source': self._action_eject_replica_source,
}

def _action_resize(self, instance, body):
options = {
'volume': self._action_resize_volume,
'flavorRef': self._action_resize_flavor
}

def _action_resize_flavor(self, instance, flavorRef):
new_flavor_id = utils.get_id_from_href(flavorRef)
instance.resize_flavor(new_flavor_id)
return wsgi.Result(None, 202)

trove/instance/models.py

1
2
3
4
5
6
7
8
9
class Instance(BuiltInstance):
def resize_flavor(self, new_flavor_id):
self.validate_can_perform_action()
...
# Set the task to RESIZING and begin the async call before returning.
self.update_db(task_status=InstanceTasks.RESIZING)
LOG.debug("Instance %s set to RESIZING.", self.id)
task_api.API(self.context).resize_flavor(self.id, old_flavor,
new_flavor)

trove/taskmanager/api.py

1
2
3
4
5
6
7
class API(object):
def resize_flavor(self, instance_id, old_flavor, new_flavor):
cctxt = self.client.prepare(version=self.version_cap)
cctxt.cast(self.context, "resize_flavor",
instance_id=instance_id,
old_flavor=self._transform_obj(old_flavor),
new_flavor=self._transform_obj(new_flavor))

trove/taskmanager/manager.py

1
2
3
4
class Manager(periodic_task.PeriodicTasks):
def resize_flavor(self, context, instance_id, old_flavor, new_flavor):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
instance_tasks.resize_flavor(old_flavor, new_flavor)

trove/taskmanager/models.py

1
2
3
4
class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
def resize_flavor(self, old_flavor, new_flavor):
action = ResizeAction(self, old_flavor, new_flavor)
action.execute()

trove/taskmanager/models.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ResizeAction(ResizeActionBase):
def _initiate_nova_action(self):
self.instance.server.resize(self.new_flavor_id)

class ResizeActionBase(object):
def _assert_processes_are_ok(self):
"""Checks the procs; if anything is wrong, reverts the operation."""
# Tell the guest to turn back on, and make sure it can start.
self._assert_guest_is_ok()
LOG.debug("Nova guest is ok.")
self._assert_datastore_is_ok()
LOG.debug("Datastore is ok.")

def _confirm_nova_action(self):
LOG.debug("Instance %s calling Compute confirm resize..."
% self.instance.id)
self.instance.server.confirm_resize()

def execute(self):
try:
self._assert_datastore_is_offline()
self._perform_nova_action()

def _perform_nova_action(self):
try:
LOG.debug("Initiating nova action")
self._initiate_nova_action()
LOG.debug("Waiting for nova action")
self._wait_for_nova_action()
LOG.debug("Asserting nova status is ok")
self._assert_nova_status_is_ok()
need_to_revert = True
LOG.debug("* * * REVERT BARRIER PASSED * * *")
LOG.debug("Asserting nova action success")
self._assert_nova_action_was_successful()
LOG.debug("Asserting processes are OK")
self._assert_processes_are_ok()
LOG.debug("Confirming nova action")
self._confirm_nova_action()

Trove Backup

trove/backup/models.py

1
2
3
4
class Backup(object):
def create(cls, context, instance, name, description=None, parent_id=None):
def _create_resources():
api.API(context).create_backup(backup_info, instance_id)

trove/guestagent/api.py

1
2
3
class API(object):
def create_backup(self, backup_info):
self._cast("create_backup", self.version_cap, backup_info=backup_info)

trove/taskmanager/api.py

1
2
3
4
5
6
class API(object):
def create_backup(self, backup_info, instance_id):
cctxt = self.client.prepare(version=self.version_cap)
cctxt.cast(self.context, "create_backup",
backup_info=backup_info,
instance_id=instance_id)

trove/taskmanager/manager.py

1
2
3
4
class Manager(periodic_task.PeriodicTasks):
def create_backup(self, context, backup_info, instance_id):
instance_tasks = models.BuiltInstanceTasks.load(context, instance_id)
instance_tasks.create_backup(backup_info)

trove/taskmanager/models.py

1
2
3
4
class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
def create_backup(self, backup_info):
LOG.info(_("Initiating backup for instance %s.") % self.id)
self.guest.create_backup(backup_info)

trove/guestagent/datastore/mysql/manager.py

1
2
3
class Manager(periodic_task.PeriodicTasks):
def create_backup(self, context, backup_info):
backup.backup(context, backup_info)

trove/guestagent/backup/init.py

1
2
def backup(context, backup_info):
return AGENT.execute_backup(context, backup_info)

trove/guestagent/backup/backupagent.py

1
2
3
4
5
6
7
8
9
10
class BackupAgent(object):
def execute_backup(self, context, backup_info,
runner=RUNNER, extra_opts=EXTRA_OPTS,
incremental_runner=INCREMENTAL_RUNNER):
storage = get_storage_strategy(
CONF.storage_strategy,
CONF.storage_namespace)(context)
...
self.stream_backup_to_storage(backup_info, runner, storage,
parent_metadata, extra_opts)
1
2
3
4
5
6
7
8
mysql.backup_incremental_strategy = {'InnoBackupEx': 'InnoBackupExIncremental'}
mysql.backup_namespace = trove.guestagent.strategies.backup.mysql_impl
mysql.backup_strategy = InnoBackupEx
mysql.device_path = /dev/vdb
mysql.mount_point = /var/lib/mysql
mysql.replication_namespace = trove.guestagent.strategies.replication.mysql_gtid
mysql.replication_strategy = MysqlGTIDReplication
mysql.restore_namespace = trove.guestagent.strategies.restore.mysql_impl
支持原创