python---sqlalchemy(一)

2024-08-31 22:18
文章标签 python sqlalchemy

本文主要是介绍python---sqlalchemy(一),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

      • 执行原生SQL语句
      • 创建删除表
      • 操作数据库表
      • scoped_session
      • 增删改查
      • 其他

执行原生SQL语句

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engineengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)def task(arg):# 方式一# conn = engine.raw_connection()# cursor = conn.cursor()# cursor.execute(#     "select * from user "# )# result = cursor.fetchall()# cursor.close()# conn.close()# print(result)# print("------ # 方式二----------------")# cur = engine.execute("select * from user")# result = cur.fetchall()# cur.close()# print(result)# print("------ # 方式三----------------")conn = engine.contextual_connect()with conn:cur = conn.execute("select * from user")result = cur.fetchall()print(result)for i in range(5):t = threading.Thread(target=task, args=(i,))t.start()

方式一输出如下:

(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))

方式二如下:

[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]

方式三如下:

[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]

创建删除表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()# ##################### 单表示例 #########################
class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)class Hosts(Base):__tablename__ = 'hosts'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)ctime = Column(DateTime, default=datetime.datetime.now)# ##################### 一对多示例 #########################
class Hobby(Base):__tablename__ = 'hobby'id = Column(Integer, primary_key=True)caption = Column(String(50), default='篮球')class Person(Base):__tablename__ = 'person'nid = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=True)hobby_id = Column(Integer, ForeignKey("hobby.id"))# 与生成表结构无关,仅用于查询方便hobby = relationship("Hobby", backref='pers')# ##################### 多对多示例 #########################class Server2Group(Base):__tablename__ = 'server2group'id = Column(Integer, primary_key=True, autoincrement=True)server_id = Column(Integer, ForeignKey('server.id'))group_id = Column(Integer, ForeignKey('group.id'))class Group(Base):__tablename__ = 'group'id = Column(Integer, primary_key=True)name = Column(String(64), unique=True, nullable=False)# 与生成表结构无关,仅用于查询方便servers = relationship('Server', secondary='server2group', backref='groups')class Server(Base):__tablename__ = 'server'id = Column(Integer, primary_key=True, autoincrement=True)hostname = Column(String(64), unique=True, nullable=False)def init_db():"""根据类创建数据库表:return:"""engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.create_all(engine)def drop_db():"""根据类删除数据库表:return:"""engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.drop_all(engine)if __name__ == '__main__':drop_db()init_db()

操作数据库表

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engineBase = declarative_base()
class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)# 每次执行数据库操作时,都需要创建一个session
session = Session()# ############# 执行ORM操作 #############
obj1 = Users(name="safly1")
session.add(obj1)# 提交事务
session.commit()
# 关闭session
session.close()

scoped_session

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_sessionimport datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:"""
session = scoped_session(Session)# ############# 执行ORM操作 #############
obj1 = Users(name="safly")
session.add(obj1)# 提交事务
session.commit()
# 关闭session
session.close()

总结:

SQLAlchemy两种创建session的方式:方式一:import modelsfrom threading import Threadfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engineengine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8",pool_size=2,max_overflow=0)XXXXXX = sessionmaker(bind=engine)def task():from sqlalchemy.orm.session import Sessionsession = XXXXXX()data = session.query(models.Classes).all()print(data)session.close()for i in range(10):t = Thread(target=task)t.start()方式二(推荐):import modelsfrom threading import Threadfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.orm import scoped_sessionengine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8",pool_size=2,max_overflow=0)XXXXXX = sessionmaker(bind=engine)session = scoped_session(XXXXXX)def task():# 1. 原来的session对象 = 执行session.registry()# 2. 原来session对象.querydata = session.query(models.Classes).all()print(data)session.remove()for i in range(10):t = Thread(target=task)t.start()flask-session默认也是使用的第二种方式:scoped_session

增删改查

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import textengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)session = Session()# ################ 添加add\add_all ################
"""
obj1 = Users(name="wupeiqi")
session.add(obj1)session.add_all([Users(name="wupeiqi"),Users(name="alex"),])
session.commit()
"""# ################ 删除 ################
"""
session.query(Users).filter(Users.id > 2).delete()
session.commit()"""# ################ 修改 ################"""
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
"""
# ################ 查询 ################
"""
"""print("-------r1----------")
r1 = session.query(Users).all()for i in r1:print(i.name)print("-------r2----------")r2 = session.query(Users.name, Users.age).all()
for i in r2:print(i)print("-------r3----------")r3 = session.query(Users).filter(Users.name == "099099").all()for i in r3:print(i.name)print("-------r4----------")
r4 = session.query(Users).filter_by(name='099099').all()for i in r4:print(i.name)print("-------r5----------")
r5 = session.query(Users).filter_by(name='099099').first()print(r5.name)print("-------r6----------")
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=5, name='099099').order_by(Users.id).all()
for i in r6:print(i.name)print("-------r7----------")
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='099099').all()
for i in r7:print(i.name)session.close()

其他

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)class Hosts(Base):__tablename__ = 'hosts'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)ctime = Column(DateTime, default=datetime.datetime.now)from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import textengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)session = Session()################################   条件###############################
ret = session.query(Users).filter(Users.id > 1, Users.name == 'safly').all()ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == '099099').all()ret = session.query(Users).filter(Users.id.in_([1, 3, 4, 10])).all()ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='099099'))).all()from sqlalchemy import and_, or_ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'xiaowan')).all()ret = session.query(Users).filter(or_(Users.id > 11, Users.name == 'eric')).all()ret = session.query(Users).filter(or_(Users.id > 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all()# ################################  通配符###############################
ret = session.query(Users).filter(Users.name.like('09%')).all()ret = session.query(Users).filter(~Users.name.like('09%')).all()#
# ############################### 切片###############################
ret = session.query(Users)[1:4]################################  排序###############################
ret = session.query(Users).order_by(Users.name.desc()).all()ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# ############################### 分组###############################
from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.age).all()ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).all()ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2).all()############################### 连表###############################
# (<__main__.Users object at 0x05934670>, <__main__.Hosts object at 0x05934530>)
# (<__main__.Users object at 0x05934610>, <__main__.Hosts object at 0x05934650>)
ret = session.query(Users, Hosts).filter(Users.id == Hosts.id).all()# for i in ret:
#     print(i)# ??
ret = session.query(Users).join(Hosts).all()
print(ret)
# ret = session.query(Users).join(Hosts, isouter=True).all()################################  组合###############################
# [('safly',), ('xiaowan',), ('sdfsf',), ('werwrw',), ('www',), ('wwerwerw',)]
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Hosts.name).filter(Hosts.id < 4)
ret = q1.union(q2).all()
print(ret)# [('safly',), ('xiaowan',), ('sdfsf',), ('werwrw',), ('www',), ('wwerwerw',), ('safly',)]
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Hosts.name).filter(Hosts.id < 4)
ret = q1.union_all(q2).all()
print(ret)session.close()

这篇关于python---sqlalchemy(一)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1125141

相关文章

使用Python创建一个能够筛选文件的PDF合并工具

《使用Python创建一个能够筛选文件的PDF合并工具》这篇文章主要为大家详细介绍了如何使用Python创建一个能够筛选文件的PDF合并工具,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录背景主要功能全部代码代码解析1. 初始化 wx.Frame 窗口2. 创建工具栏3. 创建布局和界面控件4

一文详解如何在Python中使用Requests库

《一文详解如何在Python中使用Requests库》:本文主要介绍如何在Python中使用Requests库的相关资料,Requests库是Python中常用的第三方库,用于简化HTTP请求的发... 目录前言1. 安装Requests库2. 发起GET请求3. 发送带有查询参数的GET请求4. 发起PO

Python与DeepSeek的深度融合实战

《Python与DeepSeek的深度融合实战》Python作为最受欢迎的编程语言之一,以其简洁易读的语法、丰富的库和广泛的应用场景,成为了无数开发者的首选,而DeepSeek,作为人工智能领域的新星... 目录一、python与DeepSeek的结合优势二、模型训练1. 数据准备2. 模型架构与参数设置3

Python进行PDF文件拆分的示例详解

《Python进行PDF文件拆分的示例详解》在日常生活中,我们常常会遇到大型的PDF文件,难以发送,将PDF拆分成多个小文件是一个实用的解决方案,下面我们就来看看如何使用Python实现PDF文件拆分... 目录使用工具将PDF按页数拆分将PDF的每一页拆分为单独的文件将PDF按指定页数拆分根据页码范围拆分

Python中常用的四种取整方式分享

《Python中常用的四种取整方式分享》在数据处理和数值计算中,取整操作是非常常见的需求,Python提供了多种取整方式,本文为大家整理了四种常用的方法,希望对大家有所帮助... 目录引言向零取整(Truncate)向下取整(Floor)向上取整(Ceil)四舍五入(Round)四种取整方式的对比综合示例应

python 3.8 的anaconda下载方法

《python3.8的anaconda下载方法》本文详细介绍了如何下载和安装带有Python3.8的Anaconda发行版,包括Anaconda简介、下载步骤、安装指南以及验证安装结果,此外,还介... 目录python3.8 版本的 Anaconda 下载与安装指南一、Anaconda 简介二、下载 An

Python自动化处理手机验证码

《Python自动化处理手机验证码》手机验证码是一种常见的身份验证手段,广泛应用于用户注册、登录、交易确认等场景,下面我们来看看如何使用Python自动化处理手机验证码吧... 目录一、获取手机验证码1.1 通过短信接收验证码1.2 使用第三方短信接收服务1.3 使用ADB读取手机短信1.4 通过API获取

python安装whl包并解决依赖关系的实现

《python安装whl包并解决依赖关系的实现》本文主要介绍了python安装whl包并解决依赖关系的实现,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录一、什么是whl文件?二、我们为什么需要使用whl文件来安装python库?三、我们应该去哪儿下

Python脚本实现图片文件批量命名

《Python脚本实现图片文件批量命名》这篇文章主要为大家详细介绍了一个用python第三方库pillow写的批量处理图片命名的脚本,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言源码批量处理图片尺寸脚本源码GUI界面源码打包成.exe可执行文件前言本文介绍一个用python第三方库pi

Python中多线程和多进程的基本用法详解

《Python中多线程和多进程的基本用法详解》这篇文章介绍了Python中多线程和多进程的相关知识,包括并发编程的优势,多线程和多进程的概念、适用场景、示例代码,线程池和进程池的使用,以及如何选择合适... 目录引言一、并发编程的主要优势二、python的多线程(Threading)1. 什么是多线程?2.