【Python系列】SQLAlchemy 基本介绍

2024-09-01 00:20

本文主要是介绍【Python系列】SQLAlchemy 基本介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
img

  • 推荐:kwan 的首页,持续学习,不断总结,共同进步,活到老学到老
  • 导航
    • 檀越剑指大厂系列:全面总结 java 核心技术,jvm,并发编程 redis,kafka,Spring,微服务等
    • 常用开发工具系列:常用的开发工具,IDEA,Mac,Alfred,Git,typora 等
    • 数据库系列:详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
    • 新空间代码工作室:提供各种软件服务,承接各种毕业设计,毕业论文等
    • 懒人运维系列:总结好用的命令,解放双手不香吗?能用一个命令完成绝不用两个操作
    • 数据结构与算法系列:总结数据结构和算法,不同类型针对性训练,提升编程思维,剑指大厂

非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

博客目录

    • 一.基础介绍
      • 1. SQLAlchemy 的起源
      • 2. SQLAlchemy 的核心组件
        • 2.1 核心 SQL 工具包
        • 2.2 ORM 层
      • 3. SQLAlchemy 的优势
        • 3.1 灵活性
        • 3.2 跨数据库支持
        • 3.3 强大的社区支持
    • 二.实战步骤
      • 1.数据库配置
      • 2.model
      • 3.连接配置
      • 4.调用 SQL

一.基础介绍

SQLAlchemy 是一个 Python 的 SQL 工具包和对象关系映射(ORM)工具,它提供了一个高层的 ORM 以及底层的 SQL 表达式语言。SQLAlchemy 是开源的,并且可以在商业和非商业项目中免费使用。它支持多种数据库系统,包括 PostgreSQL、MySQL、SQLite 等。
在这里插入图片描述

1. SQLAlchemy 的起源

SQLAlchemy 最初由 Michael Bayer 在 2005 年创建,目的是提供一个全面的 SQL 工具包和 ORM 解决方案,以满足 Python 社区的需求。随着时间的推移,SQLAlchemy 不断发展和完善,成为了 Python 数据库编程领域中最受欢迎的库之一。

2. SQLAlchemy 的核心组件

2.1 核心 SQL 工具包

SQLAlchemy 的核心 SQL 工具包提供了构建 SQL 查询的功能,它允许开发者以 Pythonic 的方式编写 SQL 语句。这包括了对数据库表的创建、数据的增删改查等操作。

2.2 ORM 层

ORM(Object-Relational Mapping)层是 SQLAlchemy 的另一个重要组成部分,它允许开发者使用 Python 类和对象来表示数据库中的表和行。ORM 层抽象了数据库操作,使得开发者可以不必编写 SQL 语句,而是通过操作 Python 对象来间接地与数据库交互。

3. SQLAlchemy 的优势

3.1 灵活性

SQLAlchemy 提供了灵活的 SQL 构建工具,开发者可以自由地编写 SQL 语句,同时也可以利用 ORM 层提供的抽象来简化数据库操作。

3.2 跨数据库支持

SQLAlchemy 支持多种数据库系统,这意味着开发者可以使用相同的代码库来操作不同的数据库,而不需要为每种数据库编写特定的代码。

3.3 强大的社区支持

由于 SQLAlchemy 的流行,它拥有一个活跃的社区,开发者可以在社区中找到大量的资源和帮助,包括文档、教程和第三方库。

二.实战步骤

1.数据库配置

# 数据库
database:TYPE: mysqlDATABASE_URL: mysql://root:xxx@xxxx:9306/test?serverTimezone=Asia/ShanghaiUSERNAME: rootPASSWORD: xxxHOST: xxxxPORT: 9306DBNAME: testMAX_OVERFLOW: 60POOL_TIMEOUT: 120POOL_SIZE: 30URL_PROPERTY: ?charset=utf8ECHO: True

2.model

from datetime import datetimeimport pytz
from sqlalchemy import String, Column, Text, DateTime, JSON
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, attributesdef get_beijing_now():# 获取当前系统时区return datetime.now(pytz.timezone('Asia/Shanghai'))# 基类
class Base(AsyncAttrs, DeclarativeBase):id: Mapped[int] = mapped_column(primary_key=True)create_time = Column(DateTime, default=get_beijing_now, nullable=False)update_time = Column(DateTime, default=get_beijing_now, onupdate=get_beijing_now, nullable=False)def to_dict(self):"""转为字典输出:return:"""return {c.name: getattr(self, c.name) for c in self.__table__.columns}@repr_generator
class AlchemyEntitySchemas(Base):__tablename__ = "entity_schemas"name = Column(String(255), nullable=False, comment='名称')

3.连接配置

from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.sql import text
from base.config import get_config_key
from urllib.parse import quote_plus as urlquote
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, AsyncEngine, async_sessionmakerclass Database:def __init__(self, url, pool_size=30, pool_timeout=1200, max_overflow=60, echo=False):try:self.engine = create_engine(url, poolclass=QueuePool, pool_size=pool_size,max_overflow=max_overflow, pool_timeout=pool_timeout,echo=echo, pool_recycle=7200, pool_pre_ping=True, echo_pool=echo)self.Session = sessionmaker(bind=self.engine, expire_on_commit=False, autocommit=False, autoflush=False)print("Database connected successfully.")except SQLAlchemyError as e:print(f"Error connecting to the database: {e}")def get_session(self):return self.Session()@staticmethoddef close_session(_session):_session.close()@staticmethoddef execute_query(query, _session):try:result = _session.execute(query)return result.fetchall()except SQLAlchemyError as e:print(f"Error executing query: {e}")return Nonefinally:Database.close_session(_session)class SyncDatabase:async_engine: AsyncEngine = Noneasync_session = Nonedef __init__(self, url, pool_size=30, pool_timeout=1200, max_overflow=60, echo=False):self.url = urlself.max_overflow = max_overflowself.pool_timeout = pool_timeoutself.pool_size = pool_sizeself.echo = echoself.connect()def connect(self):"""创建数据库引擎和会话类"""try:self.async_engine = create_async_engine(self.url, echo=self.echo, pool_size=pool_size,max_overflow=max_overflow, pool_timeout=pool_timeout,pool_recycle=7200,pool_pre_ping=True, echo_pool=self.echo)self.async_session = async_sessionmaker(bind=self.async_engine, class_=AsyncSession, expire_on_commit=False,autocommit=False, autoflush=False)print("Database connected successfully.")except SQLAlchemyError as e:print(f"Error connecting to the database: {e}")def get_db_url():userName = get_config_key("database", "USERNAME")password = get_config_key("database", "PASSWORD")dbHost = get_config_key("database", "HOST")dbPort = get_config_key("database", "PORT")dbName = get_config_key("database", "DBNAME")urlProperty = get_config_key("database", "URL_PROPERTY")if dbName is None:return f'mysql+pymysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}{urlProperty}'else:return f'mysql+pymysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}/{dbName}{urlProperty}'def get_sync_db_url():userName = get_config_key("database", "USERNAME")password = get_config_key("database", "PASSWORD")dbHost = get_config_key("database", "HOST")dbPort = get_config_key("database", "PORT")dbName = get_config_key("database", "DBNAME")urlProperty = get_config_key("database", "URL_PROPERTY")if dbName is None:return f'mysql+aiomysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}{urlProperty}'else:return f'mysql+aiomysql://{userName}:{urlquote(password)}@{dbHost}:{dbPort}/{dbName}{urlProperty}'url = get_db_url()
max_overflow = get_config_key("database", "MAX_OVERFLOW")
pool_timeout = get_config_key("database", "POOL_TIMEOUT")
pool_size = get_config_key("database", "POOL_SIZE")
echo = get_config_key("database", "ECHO")# sqlalchemy实际操作对象,导入的时候应该导入这个对象
get_sqlalchemy_db = Database(url, pool_size=pool_size, pool_timeout=pool_timeout, max_overflow=max_overflow, echo=echo)# 异步的
SYNC_DB_URI = get_sync_db_url()
_async_engine = create_async_engine(SYNC_DB_URI, echo=echo, pool_size=pool_size,max_overflow=max_overflow, pool_timeout=pool_timeout, pool_recycle=7200,pool_pre_ping=True, echo_pool=echo)
# 异步IO的 sqlalchemy实际操作对象,导入的时候应该导入这个对象
async_session_factory = async_sessionmaker(bind=_async_engine, class_=AsyncSession, expire_on_commit=False,autocommit=False, autoflush=False)

在这里插入图片描述

4.调用 SQL

@staticmethod
async def find_by_name(name: str):"""根据名称查询"""db = get_sqlalchemy_dbtry:with Session(db.engine) as session:stmt = select(AlchemySchemas)if name:stmt = stmt.where(AlchemySchemas.name == name)schemas_infos = session.scalars(stmt).all()return [schemas_info.to_dict() for schemas_info in schemas_infos] if schemas_infos else Noneexcept SQLAlchemyError as e:print(f"An error occurred: {e}")return Nonefinally:db.close_session(session)

觉得有用的话点个赞 👍🏻 呗。
❤️❤️❤️本人水平有限,如有纰漏,欢迎各位大佬评论批评指正!😄😄😄

💘💘💘如果觉得这篇文对你有帮助的话,也请给个点赞、收藏下吧,非常感谢!👍 👍 👍

🔥🔥🔥Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

img

这篇关于【Python系列】SQLAlchemy 基本介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1125414

相关文章

Python判断for循环最后一次的6种方法

《Python判断for循环最后一次的6种方法》在Python中,通常我们不会直接判断for循环是否正在执行最后一次迭代,因为Python的for循环是基于可迭代对象的,它不知道也不关心迭代的内部状态... 目录1.使用enuhttp://www.chinasem.cnmerate()和len()来判断for

使用Python实现高效的端口扫描器

《使用Python实现高效的端口扫描器》在网络安全领域,端口扫描是一项基本而重要的技能,通过端口扫描,可以发现目标主机上开放的服务和端口,这对于安全评估、渗透测试等有着不可忽视的作用,本文将介绍如何使... 目录1. 端口扫描的基本原理2. 使用python实现端口扫描2.1 安装必要的库2.2 编写端口扫

使用Python实现操作mongodb详解

《使用Python实现操作mongodb详解》这篇文章主要为大家详细介绍了使用Python实现操作mongodb的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、示例二、常用指令三、遇到的问题一、示例from pymongo import MongoClientf

使用Python合并 Excel单元格指定行列或单元格范围

《使用Python合并Excel单元格指定行列或单元格范围》合并Excel单元格是Excel数据处理和表格设计中的一项常用操作,本文将介绍如何通过Python合并Excel中的指定行列或单... 目录python Excel库安装Python合并Excel 中的指定行Python合并Excel 中的指定列P

四种Flutter子页面向父组件传递数据的方法介绍

《四种Flutter子页面向父组件传递数据的方法介绍》在Flutter中,如果父组件需要调用子组件的方法,可以通过常用的四种方式实现,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录方法 1:使用 GlobalKey 和 State 调用子组件方法方法 2:通过回调函数(Callb

一文详解Python中数据清洗与处理的常用方法

《一文详解Python中数据清洗与处理的常用方法》在数据处理与分析过程中,缺失值、重复值、异常值等问题是常见的挑战,本文总结了多种数据清洗与处理方法,文中的示例代码简洁易懂,有需要的小伙伴可以参考下... 目录缺失值处理重复值处理异常值处理数据类型转换文本清洗数据分组统计数据分箱数据标准化在数据处理与分析过

Python调用另一个py文件并传递参数常见的方法及其应用场景

《Python调用另一个py文件并传递参数常见的方法及其应用场景》:本文主要介绍在Python中调用另一个py文件并传递参数的几种常见方法,包括使用import语句、exec函数、subproce... 目录前言1. 使用import语句1.1 基本用法1.2 导入特定函数1.3 处理文件路径2. 使用ex

Python脚本实现自动删除C盘临时文件夹

《Python脚本实现自动删除C盘临时文件夹》在日常使用电脑的过程中,临时文件夹往往会积累大量的无用数据,占用宝贵的磁盘空间,下面我们就来看看Python如何通过脚本实现自动删除C盘临时文件夹吧... 目录一、准备工作二、python脚本编写三、脚本解析四、运行脚本五、案例演示六、注意事项七、总结在日常使用

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

python管理工具之conda安装部署及使用详解

《python管理工具之conda安装部署及使用详解》这篇文章详细介绍了如何安装和使用conda来管理Python环境,它涵盖了从安装部署、镜像源配置到具体的conda使用方法,包括创建、激活、安装包... 目录pytpshheraerUhon管理工具:conda部署+使用一、安装部署1、 下载2、 安装3