python---sqlalchemy(一)

2024-08-31 22:18
文章标签 python sqlalchemy

本文主要是介绍python---sqlalchemy(一),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

      • 执行原生SQL语句
      • 创建删除表
      • 操作数据库表
      • scoped_session
      • 增删改查
      • 其他

执行原生SQL语句

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engineengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)def task(arg):# 方式一# conn = engine.raw_connection()# cursor = conn.cursor()# cursor.execute(#     "select * from user "# )# result = cursor.fetchall()# cursor.close()# conn.close()# print(result)# print("------ # 方式二----------------")# cur = engine.execute("select * from user")# result = cur.fetchall()# cur.close()# print(result)# print("------ # 方式三----------------")conn = engine.contextual_connect()with conn:cur = conn.execute("select * from user")result = cur.fetchall()print(result)for i in range(5):t = threading.Thread(target=task, args=(i,))t.start()

方式一输出如下:

(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))
(('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3))

方式二如下:

[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]

方式三如下:

[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]
[('safly', '123', 1), ('saf', '123', 2), ('alex', '123', 3)]

创建删除表

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()# ##################### 单表示例 #########################
class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)class Hosts(Base):__tablename__ = 'hosts'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)ctime = Column(DateTime, default=datetime.datetime.now)# ##################### 一对多示例 #########################
class Hobby(Base):__tablename__ = 'hobby'id = Column(Integer, primary_key=True)caption = Column(String(50), default='篮球')class Person(Base):__tablename__ = 'person'nid = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=True)hobby_id = Column(Integer, ForeignKey("hobby.id"))# 与生成表结构无关,仅用于查询方便hobby = relationship("Hobby", backref='pers')# ##################### 多对多示例 #########################class Server2Group(Base):__tablename__ = 'server2group'id = Column(Integer, primary_key=True, autoincrement=True)server_id = Column(Integer, ForeignKey('server.id'))group_id = Column(Integer, ForeignKey('group.id'))class Group(Base):__tablename__ = 'group'id = Column(Integer, primary_key=True)name = Column(String(64), unique=True, nullable=False)# 与生成表结构无关,仅用于查询方便servers = relationship('Server', secondary='server2group', backref='groups')class Server(Base):__tablename__ = 'server'id = Column(Integer, primary_key=True, autoincrement=True)hostname = Column(String(64), unique=True, nullable=False)def init_db():"""根据类创建数据库表:return:"""engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.create_all(engine)def drop_db():"""根据类删除数据库表:return:"""engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test?charset=utf8",max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置))Base.metadata.drop_all(engine)if __name__ == '__main__':drop_db()init_db()

操作数据库表

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engineBase = declarative_base()
class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)# 每次执行数据库操作时,都需要创建一个session
session = Session()# ############# 执行ORM操作 #############
obj1 = Users(name="safly1")
session.add(obj1)# 提交事务
session.commit()
# 关闭session
session.close()

scoped_session

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_sessionimport datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)engine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:"""
session = scoped_session(Session)# ############# 执行ORM操作 #############
obj1 = Users(name="safly")
session.add(obj1)# 提交事务
session.commit()
# 关闭session
session.close()

总结:

SQLAlchemy两种创建session的方式:方式一:import modelsfrom threading import Threadfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engineengine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8",pool_size=2,max_overflow=0)XXXXXX = sessionmaker(bind=engine)def task():from sqlalchemy.orm.session import Sessionsession = XXXXXX()data = session.query(models.Classes).all()print(data)session.close()for i in range(10):t = Thread(target=task)t.start()方式二(推荐):import modelsfrom threading import Threadfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_enginefrom sqlalchemy.orm import scoped_sessionengine =create_engine("mysql+pymysql://root:123456@127.0.0.1:3306/s8day128db?charset=utf8",pool_size=2,max_overflow=0)XXXXXX = sessionmaker(bind=engine)session = scoped_session(XXXXXX)def task():# 1. 原来的session对象 = 执行session.registry()# 2. 原来session对象.querydata = session.query(models.Classes).all()print(data)session.remove()for i in range(10):t = Thread(target=task)t.start()flask-session默认也是使用的第二种方式:scoped_session

增删改查

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import textengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)session = Session()# ################ 添加add\add_all ################
"""
obj1 = Users(name="wupeiqi")
session.add(obj1)session.add_all([Users(name="wupeiqi"),Users(name="alex"),])
session.commit()
"""# ################ 删除 ################
"""
session.query(Users).filter(Users.id > 2).delete()
session.commit()"""# ################ 修改 ################"""
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
"""
# ################ 查询 ################
"""
"""print("-------r1----------")
r1 = session.query(Users).all()for i in r1:print(i.name)print("-------r2----------")r2 = session.query(Users.name, Users.age).all()
for i in r2:print(i)print("-------r3----------")r3 = session.query(Users).filter(Users.name == "099099").all()for i in r3:print(i.name)print("-------r4----------")
r4 = session.query(Users).filter_by(name='099099').all()for i in r4:print(i.name)print("-------r5----------")
r5 = session.query(Users).filter_by(name='099099').first()print(r5.name)print("-------r6----------")
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=5, name='099099').order_by(Users.id).all()
for i in r6:print(i.name)print("-------r7----------")
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='099099').all()
for i in r7:print(i.name)session.close()

其他

import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationshipBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)age = Column(Integer, default=18)email = Column(String(32), unique=True)ctime = Column(DateTime, default=datetime.datetime.now)extra = Column(Text, nullable=True)__table_args__ = (# UniqueConstraint('id', 'name', name='uix_id_name'),# Index('ix_id_name', 'name', 'extra'),)class Hosts(Base):__tablename__ = 'hosts'id = Column(Integer, primary_key=True)name = Column(String(32), index=True)ctime = Column(DateTime, default=datetime.datetime.now)from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import textengine = create_engine("mysql+pymysql://root:root@127.0.0.1:3306/test", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)session = Session()################################   条件###############################
ret = session.query(Users).filter(Users.id > 1, Users.name == 'safly').all()ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == '099099').all()ret = session.query(Users).filter(Users.id.in_([1, 3, 4, 10])).all()ret = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all()ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='099099'))).all()from sqlalchemy import and_, or_ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'xiaowan')).all()ret = session.query(Users).filter(or_(Users.id > 11, Users.name == 'eric')).all()ret = session.query(Users).filter(or_(Users.id > 2,and_(Users.name == 'eric', Users.id > 3),Users.extra != "")).all()# ################################  通配符###############################
ret = session.query(Users).filter(Users.name.like('09%')).all()ret = session.query(Users).filter(~Users.name.like('09%')).all()#
# ############################### 切片###############################
ret = session.query(Users)[1:4]################################  排序###############################
ret = session.query(Users).order_by(Users.name.desc()).all()ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# ############################### 分组###############################
from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.age).all()ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).all()ret = session.query(func.max(Users.id),func.sum(Users.id),func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) > 2).all()############################### 连表###############################
# (<__main__.Users object at 0x05934670>, <__main__.Hosts object at 0x05934530>)
# (<__main__.Users object at 0x05934610>, <__main__.Hosts object at 0x05934650>)
ret = session.query(Users, Hosts).filter(Users.id == Hosts.id).all()# for i in ret:
#     print(i)# ??
ret = session.query(Users).join(Hosts).all()
print(ret)
# ret = session.query(Users).join(Hosts, isouter=True).all()################################  组合###############################
# [('safly',), ('xiaowan',), ('sdfsf',), ('werwrw',), ('www',), ('wwerwerw',)]
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Hosts.name).filter(Hosts.id < 4)
ret = q1.union(q2).all()
print(ret)# [('safly',), ('xiaowan',), ('sdfsf',), ('werwrw',), ('www',), ('wwerwerw',), ('safly',)]
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Hosts.name).filter(Hosts.id < 4)
ret = q1.union_all(q2).all()
print(ret)session.close()

这篇关于python---sqlalchemy(一)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1125141

相关文章

Python判断for循环最后一次的6种方法

《Python判断for循环最后一次的6种方法》在Python中,通常我们不会直接判断for循环是否正在执行最后一次迭代,因为Python的for循环是基于可迭代对象的,它不知道也不关心迭代的内部状态... 目录1.使用enuhttp://www.chinasem.cnmerate()和len()来判断for

使用Python实现高效的端口扫描器

《使用Python实现高效的端口扫描器》在网络安全领域,端口扫描是一项基本而重要的技能,通过端口扫描,可以发现目标主机上开放的服务和端口,这对于安全评估、渗透测试等有着不可忽视的作用,本文将介绍如何使... 目录1. 端口扫描的基本原理2. 使用python实现端口扫描2.1 安装必要的库2.2 编写端口扫

使用Python实现操作mongodb详解

《使用Python实现操作mongodb详解》这篇文章主要为大家详细介绍了使用Python实现操作mongodb的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、示例二、常用指令三、遇到的问题一、示例from pymongo import MongoClientf

使用Python合并 Excel单元格指定行列或单元格范围

《使用Python合并Excel单元格指定行列或单元格范围》合并Excel单元格是Excel数据处理和表格设计中的一项常用操作,本文将介绍如何通过Python合并Excel中的指定行列或单... 目录python Excel库安装Python合并Excel 中的指定行Python合并Excel 中的指定列P

一文详解Python中数据清洗与处理的常用方法

《一文详解Python中数据清洗与处理的常用方法》在数据处理与分析过程中,缺失值、重复值、异常值等问题是常见的挑战,本文总结了多种数据清洗与处理方法,文中的示例代码简洁易懂,有需要的小伙伴可以参考下... 目录缺失值处理重复值处理异常值处理数据类型转换文本清洗数据分组统计数据分箱数据标准化在数据处理与分析过

Python调用另一个py文件并传递参数常见的方法及其应用场景

《Python调用另一个py文件并传递参数常见的方法及其应用场景》:本文主要介绍在Python中调用另一个py文件并传递参数的几种常见方法,包括使用import语句、exec函数、subproce... 目录前言1. 使用import语句1.1 基本用法1.2 导入特定函数1.3 处理文件路径2. 使用ex

Python脚本实现自动删除C盘临时文件夹

《Python脚本实现自动删除C盘临时文件夹》在日常使用电脑的过程中,临时文件夹往往会积累大量的无用数据,占用宝贵的磁盘空间,下面我们就来看看Python如何通过脚本实现自动删除C盘临时文件夹吧... 目录一、准备工作二、python脚本编写三、脚本解析四、运行脚本五、案例演示六、注意事项七、总结在日常使用

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

python管理工具之conda安装部署及使用详解

《python管理工具之conda安装部署及使用详解》这篇文章详细介绍了如何安装和使用conda来管理Python环境,它涵盖了从安装部署、镜像源配置到具体的conda使用方法,包括创建、激活、安装包... 目录pytpshheraerUhon管理工具:conda部署+使用一、安装部署1、 下载2、 安装3

Python进阶之Excel基本操作介绍

《Python进阶之Excel基本操作介绍》在现实中,很多工作都需要与数据打交道,Excel作为常用的数据处理工具,一直备受人们的青睐,本文主要为大家介绍了一些Python中Excel的基本操作,希望... 目录概述写入使用 xlwt使用 XlsxWriter读取修改概述在现实中,很多工作都需要与数据打交