/usr/lib/python2.7/dist-packages/oslo_db/sqlalchemy/migration_cli/manager.py is in python-oslo.db 4.13.3-3.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | # Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
from stevedore import enabled
from oslo_db import exception
MIGRATION_NAMESPACE = 'oslo.db.migration'
def check_plugin_enabled(ext):
"""Used for EnabledExtensionManager."""
return ext.obj.enabled
class MigrationManager(object):
def __init__(self, migration_config, engine=None):
if engine is None:
if migration_config.get('db_url'):
engine = sqlalchemy.create_engine(
migration_config['db_url'],
poolclass=sqlalchemy.pool.NullPool,
)
else:
raise ValueError('Either database url or engine'
' must be provided.')
self._manager = enabled.EnabledExtensionManager(
MIGRATION_NAMESPACE,
check_plugin_enabled,
invoke_args=(engine, migration_config),
invoke_on_load=True
)
if not self._plugins:
raise ValueError('There must be at least one plugin active.')
@property
def _plugins(self):
return sorted(ext.obj for ext in self._manager.extensions)
def upgrade(self, revision):
"""Upgrade database with all available backends."""
# a revision exists only in a single plugin. Until we reached it, we
# should upgrade to the plugins' heads.
# revision=None is a special case meaning latest revision.
rev_in_plugins = [p.has_revision(revision) for p in self._plugins]
if not any(rev_in_plugins) and revision is not None:
raise exception.DBMigrationError('Revision does not exist')
results = []
for plugin, has_revision in zip(self._plugins, rev_in_plugins):
if not has_revision or revision is None:
results.append(plugin.upgrade(None))
else:
results.append(plugin.upgrade(revision))
break
return results
def downgrade(self, revision):
"""Downgrade database with available backends."""
# a revision exists only in a single plugin. Until we reached it, we
# should upgrade to the plugins' first revision.
# revision=None is a special case meaning initial revision.
rev_in_plugins = [p.has_revision(revision) for p in self._plugins]
if not any(rev_in_plugins) and revision is not None:
raise exception.DBMigrationError('Revision does not exist')
# downgrading should be performed in reversed order
results = []
for plugin, has_revision in zip(reversed(self._plugins),
reversed(rev_in_plugins)):
if not has_revision or revision is None:
results.append(plugin.downgrade(None))
else:
results.append(plugin.downgrade(revision))
break
return results
def version(self):
"""Return last version of db."""
last = None
for plugin in self._plugins:
version = plugin.version()
if version is not None:
last = version
return last
def revision(self, message, autogenerate):
"""Generate template or autogenerated revision."""
# revision should be done only by last plugin
return self._plugins[-1].revision(message, autogenerate)
def stamp(self, revision):
"""Create stamp for a given revision."""
return self._plugins[-1].stamp(revision)
|