本文主要是介绍opensips 3.5的DB部署,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
opensips 3.X的DB部署方式较之前版本有很大的不同。本文以opensips 3.5 为例,说明部署的过程。
当OpenSIPS安装完成后,需要进一步做什么?最大的可能就是部署配套的DB。因为很多功能离不开它,比如用户鉴权、注册信息持久化、dialog信息维护,等等。
你可以通过opensips-cli来部署db。在部署之前,你需要先安装它,如何安装请参考CentOS8安装opensips-cli-CSDN博客。本人
配置 OpenSIPS CLI
OpenSIPS CLI的配置信息,存储在配置文件里,用自己熟悉的文本编辑上期工具打开它,配置以下选项::
database_schema_path
- (默认为/usr/share/opensips/
)
把它的值设置为:[Install_Path]/share/opensips/
其中[Install_Path]指向自己的opensips安装路径database_url
- 需要连接的DB URL
如果未指定,在部署时会有交互提示)database_name
- (默认为oopensips
)
指定需要连接的数据库名字database_modules
- (缺省安装标准模块的表)
指定需要安装DB的模块,可以指定ALL,把所有表都装上
更多配置信息,请参考:opensips-cli/docs/modules/database.md at master · OpenSIPS/opensips-cli · GitHub
注意: OpenSIPS CLI 按以下顺序查询配置文件: ~/.opensips-cli.cfg
, /etc/opensips-cli.cfg
, /etc/opensips/opensips-cli.cfg
, 此外,你也可以通过-f参数指定自己的配置文件路径。
下面是我的配置文件模板
[default]
#log_level: DEBUG
log_level: INFO
prompt_name: opensips-cli
prompt_intro: Welcome to OpenSIPS Command Line Interface!
prompt_emptyline_repeat_cmd: False
history_file: ~/.opensips-cli.history
history_file_size: 1000
output_type: pretty-print
communication_type: fifo
fifo_file: /tmp/opensips_fifo
database_schema_path:/opt/payne/share/opensips/mysql
database_admin_url: mysql://root:opensipsdb@localhost
database_url: mysql://opensips:opensipsrw@localhost
database_name: opensips
database_modules: ALL
创建数据库
I 准备好上述内容之后,你可以执行下面命令创建数据库:
opensips-cli -x database create
在这之后,如果因为增加模块需要添加新模块的关联表,比如说presence,只需要执行以下命令:
opensips-cli -x database add presence
当然,你也可以指定一个DB名,比如说opensips_test:
opensips-cli -x database create opensips_test
碰到问题:
AttributeError: 'str' object has no attribute '_execute_on_connection'
原因是用的sqlalchemy不支持string型参数,需要用text对象封装。
问题修改代码:
opensipscli/db.py
# all good - it's time to create the databaseif self.dialect == "postgresql":self.__conn.connection.connection.set_isolation_level(0)try:self.__conn.execute("CREATE DATABASE {}".format(self.db_name))self.__conn.connection.connection.set_isolation_level(1)except sqlalchemy.exc.OperationalError as se:logger.error("cannot create database: {}!".format(se))return Falseelif self.dialect != "sqlite":from sqlalchemy import textcreate_database_stmt = text("CREATE DATABASE {}".format(self.db_name))#self.__conn.execute("CREATE DATABASE {}".format(self.db_name))self.__conn.execute(create_database_stmt)
全部的SQL封装都需要修改。下面是修改后的db.py文件,修改后重装安装一下OpenSIPS CLI
#!/usr/bin/env python
##
## This file is part of OpenSIPS CLI
## (see https://github.com/OpenSIPS/opensips-cli).
##
## This program is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program. If not, see <http://www.gnu.org/licenses/>.
##from opensipscli.logger import logger
from opensipscli.config import cfg
import retry:import sqlalchemyfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Date, Integer, String, Booleanfrom sqlalchemy.orm import sessionmaker, deferred# for now, we use our own make_url(), since Alchemy API is highly unstable# (https://github.com/OpenSIPS/opensips-cli/issues/85)#from sqlalchemy.engine.url import make_urlsqlalchemy_available = Truelogger.debug("SQLAlchemy version: ", sqlalchemy.__version__)try:import sqlalchemy_utilsexcept ImportError:logger.debug("using embedded implementation of SQLAlchemy_Utils")# copied from SQLAlchemy_utils repositoryfrom opensipscli.libs import sqlalchemy_utils
except ImportError:logger.info("sqlalchemy not available!")sqlalchemy_available = FalseSUPPORTED_BACKENDS = ["mysql","postgresql","sqlite","oracle",
]"""
SQLAlchemy: Classes for ORM handling
"""
if sqlalchemy_available:Base = declarative_base()class Roles(Base):"""Postgres: Roles database"""__tablename__ = 'pg_roles'oid = Column(Integer, primary_key=True)rolname = Column(String)rolsuper = deferred(Column(Boolean), group='options')rolinherit = deferred(Column(Boolean), group='options')rolcreaterole = deferred(Column(Boolean), group='options')rolcreatedb = deferred(Column(Boolean), group='options')rolcanlogin = deferred(Column(Boolean), group='options')rolreplication = deferred(Column(Boolean), group='options')rolconnlimit = deferred(Column(Integer))rolpassword = Column(String)rolvaliduntil = deferred(Column(Date))rolbypassrls = deferred(Column(Boolean))rolconfig = deferred(Column(String))def __repr__(self):"""returns a string from an arbitrary object"""return self.shapeclass osdbError(Exception):"""OSDB: error handler"""passclass osdbConnectError(osdbError):"""OSDB: connecton error handler"""passclass osdbArgumentError(osdbError):"""OSDB: argument error handler"""passclass osdbNoSuchModuleError(osdbError):"""OSDB: module error handler"""passclass osdbModuleAlreadyExistsError(osdbError):"""OSDB: module error handler"""passclass osdbAccessDeniedError(osdbError):"""OSDB: module error handler"""passclass DBURL(object):def __init__(self, url):arr = url.split('://')self.drivername = arr[0].strip()if len(arr) != 2 or not self.drivername:raise Exception('Failed to parse RFC 1738 URL')self.username = Noneself.password = Noneself.host = Noneself.port = Noneself.database = Noneurl = arr[1].strip()if not url:returnarr = url.split('/')if len(arr) > 1:self.database = "/".join(arr[1:]).strip()url = arr[0].strip()arr = url.split('@')if len(arr) > 1:# handle user + passwordupass = arr[0].strip().split(':')self.username = upass[0].strip()if len(upass) > 1:self.password = ":".join(upass[1:]).strip()url = arr[1].strip()else:url = arr[0].strip()# handle host + portarr = url.strip().split(':')self.host = arr[0].strip()if len(arr) > 1:self.port = int(arr[1].strip())def __repr__(self):return "{}://{}{}{}{}{}{}".format(self.drivername,self.username or "",":***" if self.username != None and self.password != None else "","@" if self.username != None else "",self.host or "",":" + str(self.port) if self.port != None else "","/" + self.database if self.database != None else "")def __str__(self):return "{}://{}{}{}{}{}{}".format(self.drivername,self.username or "",":" + self.password if self.username != None and self.password != None else "","@" if self.username != None else "",self.host or "",":" + str(self.port) if self.port != None else "","/" + self.database if self.database != None else "")def make_url(url_string):return DBURL(url_string)class osdb(object):"""Class: object store database"""def __init__(self, db_url, db_name):"""constructor"""self.db_url = db_urlself.db_name = db_nameself.dialect = osdb.get_dialect(db_url)self.Session = sessionmaker()self.__engine = Noneself.__conn = None# TODO: do this only for SQLAlchemytry:if self.dialect == "postgresql":self.__engine = sqlalchemy.create_engine(db_url, isolation_level='AUTOCOMMIT')else:self.__engine = sqlalchemy.create_engine(db_url)logger.debug("connecting to %s", db_url)self.__conn = self.__engine.connect().\execution_options(autocommit=True)# connect the Session object to our engineself.Session.configure(bind=self.__engine)# instanciate the Session objectself.__session = self.Session()except sqlalchemy.exc.OperationalError as se:if self.dialect == "mysql":try:if int(se.args[0].split(",")[0].split("(")[2]) in [2006, # MySQL1698, # MariaDB "Access Denied"1044, # MariaDB "DB Access Denied"1045, # MariaDB "Access Denied (Using Password)"]:raise osdbAccessDeniedErrorexcept osdbAccessDeniedError:raiseexcept:logger.error("unexpected parsing exception")elif self.dialect == "postgresql" and \(("authentication" in se.args[0] and "failed" in se.args[0]) or \("no password supplied" in se.args[0])):raise osdbAccessDeniedErrorraise osdbError("unable to connect to the database")except sqlalchemy.exc.NoSuchModuleError:raise osdbError("cannot handle {} dialect".format(self.dialect))except sqlalchemy.exc.ArgumentError:raise osdbArgumentError("bad DB URL: {}".format(self.db_url))def alter_role(self, role_name, role_options=None, role_password=None):"""alter attributes of a role object"""# TODO: is any other dialect using the "role" concept?if self.dialect != "postgresql":return False# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")return Falseif not role_options is None:sqlcmd = "ALTER ROLE {} WITH {}".format(role_name, role_options)msg = "Alter role '{}' with options '{}'". \format(role_name, role_options, self.db_name)if not role_password is None:sqlcmd += " PASSWORD '{}'".format(role_password)msg += " and password '********'"msg += " on database '{}'".format(self.db_name)try:result = self.__conn.execute(sqlcmd)if result:logger.info( "{} was successfull".format(msg))except:logger.error("%s failed", msg)return Falsereturndef connect(self, db_name=None):"""connect to database"""if db_name is not None:self.db_name = db_name# TODO: do this only for SQLAlchemytry:if self.dialect == "postgresql":self.db_url = self.set_url_db(self.db_url, self.db_name)if sqlalchemy_utils.database_exists(self.db_url) is True:engine = sqlalchemy.create_engine(self.db_url, isolation_level='AUTOCOMMIT')if self.__conn:self.__conn.close()self.__conn = engine.connect()# connect the Session object to our engineself.Session.configure(bind=self.__engine)# instanciate the Session objectself.session = self.Session()logger.debug("connected to database URL '%s'", self.db_url)elif self.dialect != "sqlite":from sqlalchemy import text#self.__conn.execute("USE {}".format(self.db_name))self.__conn.execute(text("USE {}".format(self.db_name)))except Exception as e:logger.error("failed to connect to %s", self.db_url)logger.error(e)return Falsereturn Truedef create(self, db_name=None):"""create a database object"""if db_name is None:db_name = self.db_name# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")logger.debug("Create Database '%s' for dialect '%s' ...",self.db_name, self.dialect)# all good - it's time to create the databaseif self.dialect == "postgresql":self.__conn.connection.connection.set_isolation_level(0)try:self.__conn.execute("CREATE DATABASE {}".format(self.db_name))self.__conn.connection.connection.set_isolation_level(1)except sqlalchemy.exc.OperationalError as se:logger.error("cannot create database: {}!".format(se))return Falseelif self.dialect != "sqlite":from sqlalchemy import textcreate_database_stmt = text("CREATE DATABASE {}".format(self.db_name))#self.__conn.execute("CREATE DATABASE {}".format(self.db_name))self.__conn.execute(create_database_stmt)logger.debug("success")return Truedef create_module(self, import_file):"""create a module object"""self.exec_sql_file(import_file)def ensure_user(self, db_url):url = make_url(db_url)if url.password is None:logger.error("database URL does not include a password")return Falseif url.drivername.lower() == "mysql":#sqlcmd = "CREATE USER IF NOT EXISTS '{}' IDENTIFIED BY '{}'".format(# url.username, url.password)from sqlalchemy import textsqlcmd = text("CREATE USER IF NOT EXISTS '{}' IDENTIFIED BY '{}'".format(url.username, url.password))try:result = self.__conn.execute(sqlcmd)if result:logger.info("created user '%s'", url.username)except:logger.error("failed to create user '%s'", url.username)return Falseif url.username == 'root':logger.debug("skipping password change for root user")else:"""Query compatibility facts when changing a MySQL user password:- SET PASSWORD syntax has diverged between MySQL and MariaDB- ALTER USER syntax is not supported in MariaDB < 10.2"""# try MariaDB syntax first#sqlcmd = "SET PASSWORD FOR '{}' = PASSWORD('{}')".format(# url.username, url.password)sqlcmd = text("SET PASSWORD FOR '{}' = PASSWORD('{}')".format(url.username, url.password))try:result = self.__conn.execute(sqlcmd)if result:logger.info("set password '%s%s%s' for '%s' (MariaDB)",url.password[0] if len(url.password) >= 1 else '',(len(url.password) - 2) * '*',url.password[-1] if len(url.password) >= 2 else '',url.username)except sqlalchemy.exc.ProgrammingError as se:try:if int(se.args[0].split(",")[0].split("(")[2]) == 1064:# syntax error! OK, now try Oracle MySQL syntaxsqlcmd = "ALTER USER '{}' IDENTIFIED BY '{}'".format(url.username, url.password)result = self.__conn.execute(sqlcmd)if result:logger.info("set password '%s%s%s' for '%s' (MySQL)",url.password[0] if len(url.password) >= 1 else '',(len(url.password) - 2) * '*',url.password[-1] if len(url.password) >= 2 else '',url.username)except:logger.exception("failed to set password for '%s'", url.username)return Falseexcept:logger.exception("failed to set password for '%s'", url.username)return False#sqlcmd = "GRANT ALL ON {}.* TO '{}'".format(self.db_name, url.username)sqlcmd = text("GRANT ALL ON {}.* TO '{}'".format(self.db_name, url.username))try:result = self.__conn.execute(sqlcmd)if result:logger.info("granted access to user '%s' on DB '%s'",url.username, self.db_name)except:logger.exception("failed to grant access to '%s' on DB '%s'",url.username, self.db_name)return Falsesqlcmd = "FLUSH PRIVILEGES"try:result = self.__conn.execute(sqlcmd)logger.info("flushed privileges")except:logger.exception("failed to flush privileges")return Falseelif url.drivername.lower() == "postgresql":if not self.exists_role(url.username):logger.info("creating role %s", url.username)if not self.create_role(url.username, url.password):logger.error("failed to create role %s", url.username)self.create_role(url.username, url.password, update=True)sqlcmd = "GRANT ALL PRIVILEGES ON DATABASE {} TO {}".format(self.db_name, url.username)logger.info(sqlcmd)try:result = self.__conn.execute(sqlcmd)if result:logger.debug("... OK")except:logger.error("failed to grant ALL to '%s' on db '%s'",url.username, self.db_name)return Falsereturn Truedef create_role(self, role_name, role_password, update=False,role_options="NOCREATEDB NOCREATEROLE LOGIN"):"""create a role object (PostgreSQL secific)"""# TODO: is any other dialect using the "role" concept?if self.dialect != "postgresql":return False# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")if update:sqlcmd = "ALTER USER {} WITH PASSWORD '{}' {}".format(role_name, role_password, role_options)else:sqlcmd = "CREATE ROLE {} WITH {} PASSWORD '{}'".format(role_name, role_options, role_password)logger.info(sqlcmd)try:result = self.__conn.execute(sqlcmd)if result:logger.info("role '{}' with options '{}' created".format(role_name, role_options))except Exception as e:logger.exception(e)logger.error("creation of new role '%s' with options '%s' failed",role_name, role_options)return Falsereturn resultdef delete(self, table, filter_keys=None):"""delete a table object from a database"""# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")where_str = self.get_where(filter_keys)statement = "DELETE FROM {}{}".format(table, where_str)try:self.__conn.execute(statement)except sqlalchemy.exc.SQLAlchemyError as ex:logger.error("cannot execute query: {}".format(statement))logger.error(ex)return Falsereturn Truedef destroy(self):"""decontructor of a database object"""# TODO: do this only for SQLAlchemyif not self.__conn:returnself.__conn.close()def drop(self):"""drop a database object"""# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")if self.dialect != "sqlite":database_url = self.set_url_db(self.db_url, self.db_name)else:database_url = 'sqlite:///' + self.db_nametry:sqlalchemy_utils.drop_database(database_url)logger.debug("database '%s' dropped", self.db_name)return Trueexcept sqlalchemy.exc.NoSuchModuleError as me:logger.error("cannot check if database {} exists: {}".format(self.db_name, me))raise osdbError("cannot handle {} dialect".format(self.dialect)) from Nonedef drop_role(self, role_name):"""drop a role object (PostgreSQL specific)"""# TODO: is any other dialect using the "role" concept?if self.dialect != "postgresql":return False# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")return Falselogger.debug("Role '%s' will be dropped", role_name)sqlcmd = "DROP ROLE IF EXISTS {}".format(role_name)try:result = self.__conn.execute(sqlcmd)if result:logger.debug("Role '%s' dropped", role_name)except:logger.error("dropping role '%s' failed", role_name)return Falsereturndef entry_exists(self, table, constraints):"""check for existence of table constraints"""ret = self.find(table, "count(*)", constraints)if ret and ret.first()[0] != 0:return Truereturn Falsedef exec_sql_file(self, sql_file):"""deploy given sql file"""# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")with open(sql_file, 'r') as f:if sql_file.endswith("-migrate.sql"):try:sql = f.read()# the DELIMITER thingies are only useful to mysql shell clientsql = re.sub(r'DELIMITER .*\n', '', sql)sql = re.sub(r'\$\$', ';', sql)# DROP/CREATE PROCEDURE statements seem to only work separatelysql = re.sub(r'DROP PROCEDURE .*\n', '', sql)self.__conn.execute(sql)except sqlalchemy.exc.IntegrityError as ie:raise osdbError("cannot deploy {} file: {}".format(sql_file, ie)) from Noneelse:from sqlalchemy import textfor sql in f.read().split(";"):sql = sql.strip()if not sql:continuetry:self.__conn.execute(text(sql))except sqlalchemy.exc.IntegrityError as ie:raise osdbModuleAlreadyExistsError("cannot deploy {} file: {}".format(sql_file, ie)) from Nonedef exists(self, db=None):"""check for existence of a database object"""check_db = db if db is not None else self.db_name# TODO: do this only for SQLAlchemyif not self.__conn:return Falseif self.dialect != "sqlite":database_url = self.set_url_db(self.db_url, check_db)else:database_url = 'sqlite:///' + check_dblogger.debug("check database URL '{}'".format(database_url))try:if sqlalchemy_utils.database_exists(database_url):logger.debug("DB '{}' exists".format(check_db))return Trueexcept sqlalchemy.exc.NoSuchModuleError as me:logger.error("cannot check if database {} exists: {}".format(check_db, me))raise osdbError("cannot handle {} dialect".format(self.dialect)) from Nonelogger.debug("DB does not exist")return Falsedef exists_role(self, role_name=None):"""check for existence of a role object (PostgreSQL specific)"""# TODO: is any other dialect using the "role" concept?if self.dialect != "postgresql":return False# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")return Falseif role_name is None:role_name = 'opensips'filter_args = {'rolname': role_name}logger.debug("filter argument: '%s'", filter_args)role_count = self.__session.query(Roles).\filter_by(**filter_args).\count()logger.debug("Number of matching role instances: '%s'", role_count)if role_count >= 1:logger.debug("Role instance '%s' exists", role_name)return Trueelse:logger.debug("Role instance '%s' does not exist", role_name)return Falsedef find(self, table, fields, filter_keys):"""match fields in a given table"""# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")if not fields:fields = ['*']elif type(fields) != list:fields = [ fields ]where_str = self.get_where(filter_keys)statement = "SELECT {} FROM {}{}".format(", ".join(fields),table,where_str)try:result = self.__conn.execute(statement)except sqlalchemy.exc.SQLAlchemyError as ex:logger.error("cannot execute query: {}".format(statement))logger.error(ex)return Nonereturn resultdef get_dialect(url):"""extract database dialect from an url"""return url.split('://')[0]def get_where(self, filter_keys):"""construct a sql 'where clause' from given filter keys"""if filter_keys:where_str = ""for k, v in filter_keys.items():where_str += " AND {} = ".format(k)if type(v) == int:where_str += str(v)else:where_str += "'{}'".format(v.translate(str.maketrans({'\'': '\\\''})))if where_str != "":where_str = " WHERE " + where_str[5:]else:where_str = ""return where_strdef get_role(self, role_name="opensips"):"""get attibutes of a role object (PostgreSQL specific)"""# TODO: is any other dialect using the "role" concept?if self.dialect != "postgresql":return False# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")return False# query elements for the given rolerole_element = self.__session.query(Roles).\filter(Roles.rolname == role_name).all()# create a dictionary and output the key-value pairsfor row in role_element:#print ("role: ", row.rolname, "(password:", row.rolpassword, "canlogin:", row.rolcanlogin, ")")dict = self.row2dict(row)for key in sorted(dict, key=lambda k: dict[k], reverse=True):print (key + ": " + dict[key])logger.debug("role_elements: %s", dict)def grant_db_options(self, role_name, on_statement, privs="ALL PRIVILEGES"):"""assign attibutes to a role object (PostgreSQL specific)"""# TODO: is any other dialect using the "role" concept?if self.dialect != "postgresql":return False# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")sqlcmd = "GRANT {} {} TO {}".format(privs, on_statement, role_name)logger.info(sqlcmd)try:self.__conn.execute(sqlcmd)except Exception as e:logger.exception(e)logger.error("failed to grant '%s' '%s' to '%s'", privs, on_statement, role_name)return Falsereturn Truedef grant_public_schema(self, role_name):self.grant_db_options(role_name, "ON SCHEMA public")def grant_table_options(self, role, table, privs="ALL PRIVILEGES"):self.grant_db_options(role, "ON TABLE {}".format(table))def has_sqlalchemy():"""check for usability of the SQLAlchemy modules"""return sqlalchemy_availabledef has_dialect(dialect):"""check for support of a given database dialect via SQLAlchemy"""# TODO: do this only for SQLAlchemytry:sqlalchemy.create_engine('{}://'.format(dialect))except sqlalchemy.exc.NoSuchModuleError:return Falsereturn Truedef insert(self, table, keys):"""insert values into table"""# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")values = ""for v in keys.values():values += ", "if type(v) == int:values += velse:values += "'{}'".format(v.translate(str.maketrans({'\'': '\\\''})))statement = "INSERT INTO {} ({}) VALUES ({})".format(table, ", ".join(keys.keys()), values[2:])try:result = self.__conn.execute(statement)except sqlalchemy.exc.SQLAlchemyError as ex:logger.error("cannot execute query: {}".format(statement))logger.error(ex)return Falsereturn resultdef migrate(self, proc_suffix, migrate_scripts, old_db, new_db, tables=[]):"""migrate from source to destination database using SQL schema files@flavour: values should resemble: '2.4_to_3.0', '3.0_to_3.1'@sp_suffix: stored procedure name suffix, specific to each migration"""if self.dialect != "mysql":logger.error("Table data migration is only supported for MySQL!")returnproc_db_migrate = 'OSIPS_DB_MIGRATE_{}'.format(proc_suffix)proc_tb_migrate = 'OSIPS_TB_COPY_{}'.format(proc_suffix)self.connect(old_db)# separately drop DB/table migration stored procedures if already# present, since there are issues with multiple statements in 1 importtry:self.__conn.execute(sqlalchemy.sql.text("DROP PROCEDURE IF EXISTS {}".format(proc_db_migrate)).execution_options(autocommit=True))self.__conn.execute(sqlalchemy.sql.text("DROP PROCEDURE IF EXISTS {}".format(proc_tb_migrate)).execution_options(autocommit=True))except:logger.exception("Failed to drop migration stored procedures!")for ms in migrate_scripts:logger.debug("Importing {}...".format(ms))self.exec_sql_file(ms)if tables:for tb in tables:logger.info("Migrating {} data... ".format(tb))try:self.__conn.execute(sqlalchemy.sql.text("CALL {}.{}('{}', '{}', '{}')".format(old_db, proc_tb_migrate, old_db, new_db, tb)))except Exception as e:logger.exception(e)logger.error("Failed to migrate '{}' table data, ".format(tb) +"see above errors!")else:try:self.__conn.execute(sqlalchemy.sql.text("CALL {}.{}('{}', '{}')".format(old_db, proc_db_migrate, old_db, new_db)))except Exception as e:logger.exception(e)logger.error("Failed to migrate database!")print("Finished copying OpenSIPS table data " +"into database '{}'!".format(new_db))def row2dict(self, row):"""convert SQL table row to python dict"""dict = {}for column in row.__table__.columns:dict[column.name] = str(getattr(row, column.name))return dictdef update(self, table, update_keys, filter_keys=None):"""update table"""# TODO: do this only for SQLAlchemyif not self.__conn:raise osdbError("connection not available")update_str = ""for k, v in update_keys.items():update_str += ", {} = ".format(k)if type(v) == int:update_str += velse:update_str += "'{}'".format(v.translate(str.maketrans({'\'': '\\\''})))where_str = self.get_where(filter_keys)statement = "UPDATE {} SET {}{}".format(table,update_str[2:], where_str)try:result = self.__conn.execute(statement)except sqlalchemy.exc.SQLAlchemyError as ex:logger.error("cannot execute query: {}".format(statement))logger.error(ex)return Falsereturn result@staticmethoddef get_db_engine():if cfg.exists('database_admin_url'):engine = osdb.get_url_driver(cfg.get('database_admin_url'))elif cfg.exists('database_url'):engine = osdb.get_url_driver(cfg.get('database_url'))else:engine = "mysql"if engine not in SUPPORTED_BACKENDS:logger.error("bad database engine ({}), supported: {}".format(engine, " ".join(SUPPORTED_BACKENDS)))return Nonereturn engine@staticmethoddef get_db_host():if cfg.exists('database_admin_url'):return osdb.get_url_host(cfg.get('database_admin_url'))elif cfg.exists('database_url'):return osdb.get_url_host(cfg.get('database_url'))return "localhost"@staticmethoddef set_url_db(url, db):"""Force a given database @url string to include the given @db.Args:url (str): the URL to change the DB for.db (str): the name of the database to set. If None, the databasepart will be removed from the URL."""at_idx = url.find('@')if at_idx < 0:logger.error("Bad database URL: {}, missing host part".format(url))return Nonedb_idx = url.find('/', at_idx)if db_idx < 0:if db is None:return urlreturn url + '/' + dbelse:if db is None:return url[:db_idx]return url[:db_idx+1] + db@staticmethoddef set_url_driver(url, driver):return driver + url[url.find(':'):]@staticmethoddef set_url_password(url, password):url = make_url(url)url.password = passwordreturn str(url)@staticmethoddef set_url_host(url, host):url = make_url(url)url.host = hostreturn str(url)@staticmethoddef get_url_driver(url, capitalize=False):if capitalize:driver = make_url(url).drivername.lower()capitalized = {'mysql': 'MySQL','postgresql': 'PostgreSQL','sqlite': 'SQLite','oracle': 'Oracle',}return capitalized.get(driver, driver.capitalize())else:return make_url(url).drivername.lower()@staticmethoddef get_url_user(url):return make_url(url).username@staticmethoddef get_url_pswd(url):return make_url(url).password@staticmethoddef get_url_host(url):return make_url(url).host
这篇关于opensips 3.5的DB部署的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!