Python中数据库操作pymysql和 sqlalchemy

2024-05-16 06:12

本文主要是介绍Python中数据库操作pymysql和 sqlalchemy,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在python中操作mysql数据库,主要用到两个库,pymysql和 sqlalchemy。分别进行介绍

安装

安装没啥好说的,其实就是pip install就完事

pip install pymysql
pip install sqlalchemy

pymsql操作数据库

创建连接

以下语句省略了import语句,请自行import

要操作数据库,首先需要创建和数据库的连接,然后才能进程CRUD的操作。

# pymysql用connect方法进行连接
conn = pymysql.connect(host="192.168.32.11", port=3306,user="hellokitty", password="123123",database="hrs", charset="utf8mb4")

插入数据Create

no = int(input('部门编号: '))
name = input('部门名称: ')
location = input('部门所在地: ')
try:  # 2.获取游标对象with conn.cursor() as cursor:# 3.通过游标对象对数据库服务器发出sql语句affected_rows = cursor.execute(f"insert into `tb_dept` values (%s,%s,%s)",(no, name, location))if affected_rows == 1:print("新增部门成功")# 4.提交conn.commit()
except pymysql.MySQLError as err:# 4.回滚conn.rollback()print(type(err), err)
finally:# 5.关闭连接conn.close()

插入多组可以参考如下:

with conn.cursor() as cursor:affected_rows = cursor.executemany('insert into `tb_dept` values (%s, %s, %s)',[(no, name, location),(no1, name1, location1),(no2, name2, location2)])if affected_rows == 3:print('新增部门成功!!!')conn.commit()

删除delete

该例子中指定了一个参数:autocommit=True,这样SQL代码就会自动提交。实际环境中不建议这样做。

no = int(input("请输入部门编号:"))conn = pymysql.connect(host="192.168.32.11", port=3306,user="hellokitty", password="123123",database="hrs", charset="utf8mb4",autocommit=True)
try:with conn.cursor() as cursor:affected_rows = cursor.execute("delete from `tb_dept` where `dno`=%s",(no,))if affected_rows == 1:print("删除部门成功")
finally:conn.close()

更新update

no = int(input('部门编号: '))
name = input('部门名称: ')
location = input('部门所在地: ')try:with conn.cursor() as cursor:affected_rows = cursor.execute("update `tb_dept` set `dname`=%s, `dloc`=%s where `dno`=%s",(name, location, no))if affected_rows == 1:print("更新部门信息成功")conn.commit()
except pymysql.MySQLError as err:conn.rollback()print(type(err), err)
finally:conn.close()

查询数据

try:with conn.cursor() as cursor:affected_rows = cursor.execute("select `dno`,`dname`,`dloc` from `tb_dept`")row = cursor.fetchone()while row:print(row)row = cursor.fetchone()
except pymysql.MySQLError as err:conn.rollback()print(type(err), err)
finally:conn.close()

本例子中,使用while循环每次用fetchone获取一条数据然后打印。也提供了fetchall方法可以获取到所有的结果,但是不推荐这样做,因为在实际环境中这样对内存的压力很大。

默认拿到的是元组,如果希望拿到列表,需要在连接数据库时指定cursorclass=DictCursor

pymysql的缺陷

通过以上例子应该可以看出来,实际上pymysql操作数据库就是通过执行sql语句来的。通过拼接字符串的方式写好sql语句,然后交给pymysql执行。这种方式的缺点是显而易见的:

  1. sql语句比较长的时候很不方便
  2. 可能会有sql注入的危险
  3. 不太优雅

为了解决这些问题,可以使用sqlalchemy库

使用sqlalchemy库操作数据库

sqlalchemy创建数据库连接

首先用create_engine方法创建数据库引擎,然后再用connect方法连接。这里要稍微注意以下,第一个参数数据库的URI这里,不像pymysql直接输入就可以了,比如说pwd_alchemy = 'abc%401234’,这里实际密码是abc@1234,但不能直接输入@,需转换成%40。

from sqlalchemy import create_engine
# 创建连接
engine = create_engine(DB_URI,  # "mysql+pymysql://{USERNAME}:{pwd_alchemy}@{HOST}:{PORT}/{DATABASE}?charset=utf8mb4",echo=False,  # echo 设为 True 会打印出实际执行的 sql,调试的时候更方便future=True,  # 使用 SQLAlchemy 2.0 API,向后兼容pool_size=5,  # 连接池的大小默认为 5 个,设置为 0 时表示连接无限制pool_recycle=3600,  # 设置时间以限制数据库自动断开
)with engine.connect() as conn:......

执行sql语句

sqlalchemy也可以通过执行sql语句的方式操作数据库,这部分和pymysql区别不大。但是execute方法执行的sql语句,需要用sqlalchemy的text方法进行封装,这一点需要注意。

from sqlalchemy import textsql_text = "select * from tb_keys"
with engine.connect() as conn:result = conn.execute(text(sql_text))# 查询结果result类似生成器, 只能遍历一遍, 遍历第二遍时就是空数据# print(result.all())res = result.all()# result可以遍历,每一行是一个row对象,类似具名元祖(namedtuple),支持以下2种遍历方式
for row in res:print(row.keys_id, row.keys_name, row.keys_count)  # 通过字段名获取# print(row[0], row[1], row[2])  # 通过索引获取

顺便执行多条的语句,也很类似

# with engine.connect() as conn:
#     data = [{"keys_id": 11, "keys_name": 'test1', "keys_count": 1},
#             {"keys_id": 12, "keys_name": 'test2', "keys_count": 1}]
#     conn.execute(
#         text("INSERT INTO tb_keys (keys_id, keys_name, keys_count) VALUES (:keys_id, :keys_name, :keys_count)"),
#         data
#     )
#     # 手动commit
#     conn.commit()

声明式API

接下来重点说明一下sqlalchemy的声明式API。这就相当于直接创建一个Table对象,如下所示。

from sqlalchemy.orm import DeclarativeBase, Sessionclass Base(DeclarativeBase):"""DeclarativeBase无法直接使用,所以要先继承一个Base类"""passclass TableCount(Base):__tablename__ = "tb_keys"keys_id: Mapped[int] = mapped_column(Integer, primary_key=True)keys_name: Mapped[str] = mapped_column(String(30), index=True)keys_count: Mapped[int] = mapped_column(Integer)

我这张表结果很简单,表名是tb_keys,然后id,name,count3个字段,分别是主键、字符串和int类型。

用声明式api进行增删改查

然后是增删改查的例子:

from sqlalchemy import select, update# 用声明式API进行select查找,而不是直接执行sql语句
with Session(engine) as session:stmt = select(TableCount).where(TableCount.keys_count == 1).order_by(TableCount.keys_id)result = session.execute(stmt)# 一般情况下,当选取整个对象的时候,都要用 scalars 方法res2 = result.scalars()for row in res2:print(row.keys_id, row.keys_name, row.keys_count)print("*" * 40)# 查询单个属性,不需要用res3 = session.execute(select(TableCount.keys_name))for row in res3:print(row.keys_name)print("*" * 40)# 查询主键有一个快捷方式,以下查询id是7key_word = session.get(TableCount, 7)print(key_word.keys_name)# 更新数据使用updatestmt = update(TableCount).where(TableCount.keys_name == "护士").values(keys_name="护师").\execution_options(synchronize_session="fetch")session.execute(stmt)# 也可以直接修改值,比如上面获取到的key_word.keys_name = "Nurse"session.commit()# 注意,以下两种方式都能更新count值,但更推荐第二种做法。第一种方式可能会导致竞争更新,race condition(竞态条件# key_word.keys_count += 1key_word.keys_count = TableCount.keys_count + 1session.commit()# 新增# new_word = TableCount()# new_word.keys_count = 0# new_word.keys_name = "Alice"# session.add(new_word)# session.commit()# 删除, 用session.delete 删除,先获取到id,在get到该对象,然后用session.delete删除del_word = session.execute(select(TableCount.keys_id).where(TableCount.keys_name == 'Alice')).fetchone()del_word_id = del_word[0]del_word_obj = session.get(TableCount, del_word_id)session.delete(del_word_obj)session.commit()

完整代码请参考:https://github.com/h-kayotin/hanayo_hr/blob/master/hanayo_hr/db_sqlalchemy.py

这篇关于Python中数据库操作pymysql和 sqlalchemy的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/994097

相关文章

利用Python快速搭建Markdown笔记发布系统

《利用Python快速搭建Markdown笔记发布系统》这篇文章主要为大家详细介绍了使用Python生态的成熟工具,在30分钟内搭建一个支持Markdown渲染、分类标签、全文搜索的私有化知识发布系统... 目录引言:为什么要自建知识博客一、技术选型:极简主义开发栈二、系统架构设计三、核心代码实现(分步解析

基于Python实现高效PPT转图片工具

《基于Python实现高效PPT转图片工具》在日常工作中,PPT是我们常用的演示工具,但有时候我们需要将PPT的内容提取为图片格式以便于展示或保存,所以本文将用Python实现PPT转PNG工具,希望... 目录1. 概述2. 功能使用2.1 安装依赖2.2 使用步骤2.3 代码实现2.4 GUI界面3.效

Python获取C++中返回的char*字段的两种思路

《Python获取C++中返回的char*字段的两种思路》有时候需要获取C++函数中返回来的不定长的char*字符串,本文小编为大家找到了两种解决问题的思路,感兴趣的小伙伴可以跟随小编一起学习一下... 有时候需要获取C++函数中返回来的不定长的char*字符串,目前我找到两种解决问题的思路,具体实现如下:

python连接本地SQL server详细图文教程

《python连接本地SQLserver详细图文教程》在数据分析领域,经常需要从数据库中获取数据进行分析和处理,下面:本文主要介绍python连接本地SQLserver的相关资料,文中通过代码... 目录一.设置本地账号1.新建用户2.开启双重验证3,开启TCP/IP本地服务二js.python连接实例1.

基于Python和MoviePy实现照片管理和视频合成工具

《基于Python和MoviePy实现照片管理和视频合成工具》在这篇博客中,我们将详细剖析一个基于Python的图形界面应用程序,该程序使用wxPython构建用户界面,并结合MoviePy、Pill... 目录引言项目概述代码结构分析1. 导入和依赖2. 主类:PhotoManager初始化方法:__in

Python从零打造高安全密码管理器

《Python从零打造高安全密码管理器》在数字化时代,每人平均需要管理近百个账号密码,本文将带大家深入剖析一个基于Python的高安全性密码管理器实现方案,感兴趣的小伙伴可以参考一下... 目录一、前言:为什么我们需要专属密码管理器二、系统架构设计2.1 安全加密体系2.2 密码强度策略三、核心功能实现详解

Python Faker库基本用法详解

《PythonFaker库基本用法详解》Faker是一个非常强大的库,适用于生成各种类型的伪随机数据,可以帮助开发者在测试、数据生成、或其他需要随机数据的场景中提高效率,本文给大家介绍PythonF... 目录安装基本用法主要功能示例代码语言和地区生成多条假数据自定义字段小结Faker 是一个 python

Python实现AVIF图片与其他图片格式间的批量转换

《Python实现AVIF图片与其他图片格式间的批量转换》这篇文章主要为大家详细介绍了如何使用Pillow库实现AVIF与其他格式的相互转换,即将AVIF转换为常见的格式,比如JPG或PNG,需要的小... 目录环境配置1.将单个 AVIF 图片转换为 JPG 和 PNG2.批量转换目录下所有 AVIF 图

Python通过模块化开发优化代码的技巧分享

《Python通过模块化开发优化代码的技巧分享》模块化开发就是把代码拆成一个个“零件”,该封装封装,该拆分拆分,下面小编就来和大家简单聊聊python如何用模块化开发进行代码优化吧... 目录什么是模块化开发如何拆分代码改进版:拆分成模块让模块更强大:使用 __init__.py你一定会遇到的问题模www.

详解如何通过Python批量转换图片为PDF

《详解如何通过Python批量转换图片为PDF》:本文主要介绍如何基于Python+Tkinter开发的图片批量转PDF工具,可以支持批量添加图片,拖拽等操作,感兴趣的小伙伴可以参考一下... 目录1. 概述2. 功能亮点2.1 主要功能2.2 界面设计3. 使用指南3.1 运行环境3.2 使用步骤4. 核