本文主要是介绍sqlachemy+sqlite进一步封装,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
api.py //封装数据库操作 增删改查
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import exc
from sqlalchemy import create_engine
from contextlib import contextmanager
import logging
import datetimeLOG = logging.getLogger()@contextmanager
def session_begin(session):try:yield sessionsession.commit()except:session.rollback()raisefinally:session.close()def _update_values(ref, values):for k in values:setattr(ref, k, values[k])def get_session():engine = create_engine('sqlite:///test.db?check_same_thread=False', echo=False)Session = sessionmaker(bind=engine)Session = scoped_session(Session)session = Session()session.execute("PRAGMA foreign_keys=ON") //解决sqlite需要设置外键启用return sessionclass TableOperator(object):def __init__(self, table_class):self.table_class = table_classdef _get(self, ref_id, session=None, force_show_deleted=False):session = session or get_session()query = session.query(self.table_class).filter_by(id=ref_id)if not force_show_deleted:query = query.filter_by(deleted=False)table_ref = query.first()if not table_ref:msg = "No table record found with ID %s" % ref_idLOG.error(msg)raise exc.NoResultFoundreturn table_refdef add(self, values):session = get_session()with session_begin(session) as session:table_ref = self.table_class()_update_values(table_ref, values)session.add(table_ref)def add_all(self, values_list):session = get_session()with session_begin(session) as session:data_list = []for values in values_list:table_ref = self.table_class()_update_values(table_ref, values)data_list.append(table_ref)session.add_all(data_list)def get(self, filters=None, force_show_deleted=False):filters = filters or {}session = get_session()query = session.query(self.table_class).filter_by(**filters)if not force_show_deleted:query = query.filter_by(deleted=False)table_records = []for table_ref in query.all():table_records.append(table_ref)return table_recordsdef destroy(self, ref_id, delete=False):session = get_session()table_ref = self._get(ref_id, session=session)with session_begin(session) as session:if not delete:table_ref.deleted = Truetable_ref.deleted_at = datetime.datetime.utcnow()else:session.delete(table_ref)return table_refdef delete_all(self, filters=None, delete=False):session = get_session()filters = filters or {}with session_begin(session) as session:if not delete:values = {"deleted": True,"deleted_at": datetime.datetime.utcnow()}query = session.query(self.table_class).filter_by(**filters). \filter_by(deleted=False)query.update(values, synchronize_session='fetch')else:session.query(self.table_class).filter_by(**filters). \filter_by(deleted=False).delete()def update(self, ref_id, values):if not ref_id or not values:msg = "ref_id/values can not be empty"LOG.error(msg)raise Exception(msg)values['updated_at'] = datetime.datetime.utcnow()session = get_session()with session_begin(session) as session:query = session.query(self.table_class).filter_by(id=ref_id).filter_by(deleted=False)table_ref = query.first()if not table_ref:msg = "No table record found with ID %s" % ref_idLOG.error(msg)raise Exception(msg)updated = query.update(values, synchronize_session='fetch')if not updated:msg = ('update table object %(ref_id)s failed' %{'ref_id': ref_id})LOG.error(msg)raise Exception(msg)return table_ref.iddef update_all(self, values, filters=None):session = get_session()filters = filters or {}values["updated_at"] = datetime.datetime.utcnow()with session_begin(session) as session:query = session.query(self.table_class).filter_by(**filters)\.filter_by(deleted=False)query.update(values, synchronize_session="fetch")
models.py //定义表
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Boolean, String
from sqlalchemy import create_engine
from sqlalchemy import DateTime
from sqlalchemy import Integer
from sqlalchemy import Text
from sqlalchemy import ForeignKey
from sqlalchemy.types import TypeDecorator
from sqlalchemy.orm import relationship
import json
import ast
import uuid
import datetimeengine = create_engine('sqlite:///test.db?check_same_thread=False', echo=False)
Base = declarative_base()class JSONEncodedDict(TypeDecorator):"""Represents an immutable structure as a json-encoded string"""impl = Textdef process_bind_param(self, value, dialect):if value is not None:value = json.dumps(value)return valuedef process_result_value(self, value, dialect):if value is not None:value = json.loads(value)return valueclass TextEncodedList(TypeDecorator):"""Represents an immutable structure as a list-encoded string"""impl = String(256)def process_bind_param(self, value, dialect):if not value:value = []return str(value)def process_result_value(self, value, dialect):if value is not None:value = ast.literal_eval(value)return valueclass DBBase(object):id = Column(String(36), primary_key=True,default=lambda: str(uuid.uuid4()))created_at = Column(DateTime,default=lambda: datetime.datetime.utcnow(),nullable=False)updated_at = Column(DateTime,default=lambda: datetime.datetime.utcnow(),nullable=True,onupdate=lambda: datetime.datetime.utcnow())deleted_at = Column(DateTime)deleted = Column(Boolean, nullable=False, default=False)class Tasks(Base, DBBase):__tablename__ = 'tasks'name = Column(String(20), nullable=False)status = Column(String(20), nullable=False, default="init")description = Column(String(256), nullable=True, default="")message = Column(Text(), default="", comment="error message")class CurTask(Base, DBBase):__tablename__ = 'cur_task'name = Column(String(20), nullable=False)current_task = Column(String(36), ForeignKey('tasks.id'),nullable=False)task_to_do = Column(TextEncodedList(), nullable=False)task = relationship('Tasks', backref='cur_task')Base.metadata.create_all(engine, checkfirst=True)
这个时候操作表就变得简单了
import models
import apiapi.TableOperator(models.Tasks).add({"name": "study"})
这篇关于sqlachemy+sqlite进一步封装的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!