Python ORM數據庫框架Sqlalchemy的使用教程詳解

對象關系映射(Object Relational Mapping,簡稱ORM)模式是一種為瞭解決面向對象與關系數據庫存在的互不匹配的現象的技術。面向對象的開發方法是當今企業級應用開發環境中的主流開發方法,關系數據庫是企業級應用環境中永久存放數據的主流數據存儲系統。對象和關系數據是業務實體的兩種表現形式,業務實體在內存中表現為對象,在數據庫中表現為關系數據。內存中的對象之間存在關聯和繼承關系,而在數據庫中,關系數據無法直接表達多對多關聯和繼承關系。因此,對象-關系映射(ORM)系統一般以中間件的形式存在,主要實現程序對象到關系數據庫數據的映射。

學過java的hibernate框架的那麼這個很好上手,非常簡單 ,他有兩種模式一種純orm另一種模式是支持原生sql這兩種可以混合使用

優點:

  • 簡潔易讀:將數據表抽象為對象(數據模型),更直觀易讀
  • 可移植:封裝瞭多種數據庫引擎,面對多個數據庫,操作基本一致,代碼易維護
  • 更安全:有效避免SQL註入

缺點: 雖然性能稍稍不及原生SQL,但是操作數據庫真的很方便!

官網: https://www.sqlalchemy.org/

概念和數據類型

概念

常見數據類型

安裝

pip install SQLAlchemy

​​​​​​​pip install mysqlclient   # 安裝自己的數據庫客戶端(可以是mysql 可以是oracle)

連接

from sqlalchemy import create_engine
engine = create_engine("mysql://user:password@hostname/dbname?charset=uft8",
            echo=True,
            pool_size=8,
            pool_recycle=60*30
            )

創建好瞭Engine的同時,Pool和Dialect也已經創建好瞭,但是此時並沒有真正與數據庫連接,等到執行具體的語句.connect()等時才會連接到數據庫。

  • echo: 當設置為True時會將orm語句轉化為sql語句打印,一般debug的時候可用
  • pool_size: 連接池的大小,默認為5個,設置為0時表示連接無限制
  • pool_recycle: 設置時間以限制數據庫多久沒連接自動斷開

創建數據庫表類(模型)

ORM的重要特點就是操作類來操作數據庫,現在我們來創建一個類,以常見的用戶表舉例:

from sqlalchemy import Column, Integer, String
from src.database.SQLalchemyFast import SQLalchemyFast

class UserDB(SQLalchemyFast.Base):
    __tablename__ = "User"   # __tablename__ 聲明表名
    # primary_key為主鍵,autoincrement為自增, doc 為註釋但是不會在數據庫中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主鍵")
    name = Column(String(64), unique=True, doc="用戶名") # unique=True 為唯一約束會在數據庫中生成索引
    email = Column(String(64), doc="郵箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)

上面的SQLalchemyFast.Base是我自己封裝的Base ,用於統一管理所有模型類,可以將Python類和數據庫表關聯映射起來。數據庫表模型類通過__tablename__和表關聯起來,Column表示數據表的列

生成數據庫表

Base = declarative_base()
Base.metadata.create_all(engine)

會自動創建表,如果存在則忽略,執行以上代碼,就會發現在db中創建瞭users表。 前提必須有模型類繼承瞭Base

會話

會話就和打電話一樣,打一次電話就是一個會話,就相當於和數據庫交互一次就是一個會話,一個會話可以運行多個或單個語句,會話結束必須關閉

sqlalchemy中使用session用於創建程序和數據庫之間的會話,所有對象的載入和保存都需要通過session對象 。

通過sessionmaker調用創建一個工廠,並關聯Engine以確保每個session都可以使用該Engine連接資源:

from sqlalchemy.orm import sessionmaker

# 創建session
DbSession = sessionmaker(bind=engine)
session = DbSession()

session的常見操作方法包括:

  • flush:預提交,提交到數據庫文件,還未寫入數據庫文件中 (沒事用)
  • commit:提交瞭一個事務
  • rollback:回滾
  • close:關閉

增刪改查

add_user = UserDB("test", "[email protected]")
session.add(add_user)
session.commit()

session.add()將會把Model加入當前session維護的持久空間(可以從session.dirty看到)中,直到commit時提交到數據庫。

users = session.query(UserDB).filter(UserDB.id=1).all()
for item in users:
  print(item.name)

session.query(Users).filter(UserDB.id=1).update({'name': "Jack"})

session.query(UserDB).filter(UserDB.name == "test").delete()
session.commit()

執行裸sql

session.execute(text(sql), params)
session.commit()

sql: select * from User where id = :id and name = :name

params: {"id":1,"name":"張三"}`

參數名必須和sql語句中的參數名一致

with關閉會話

        DbSession = sessionmaker(bind=engine)
        with DbSession() as conn:
            # 代碼
            conn.commit()

sql建造者模式

需要導入的包

from sqlalchemy import  delete, update, text, select, join, desc, asc

        sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
            where(text("id = :id and name = :name")).\
            group_by(UserDB.id,UserDB.name).\
            having(text("id = :id and name = :name")).\
            order_by(desc("id"),asc("name")).\
            offset(1).limit(10).\
            params(id=1, name="張三")
        print(sql)

以上sql放入到execute裡直接就能跑瞭

多表聯查(隻支持內查詢和左查詢和全查詢)

        sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
            join(BookDB,UserDB.id == BookDB.id).\
            join(alias(BookDB,"b"),text("b.id == b.id"),isouter=True).\
            join(alias(BookDB,"e"),text("e.id == e.id"),full=True). \
            where(text("id = :id and name = :name"))
        print(sql)

封裝的工具

數據庫配置文件database.properties

url=mysql://root:[email protected]:3306/demo?charset=utf8
echo=True # 是否打印sql語句
pool_size=10 # 連接池大小
pool_recycle=1800 # 連接池回收時間
pool_timeout=30 # 連接池超時時間
isolation_level=READ_COMMITTED # 事務隔離級別

工具

from sqlalchemy import create_engine, delete, update, text, alias
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base, sessionmaker

from src.file.FileReadAndWrite import FileReadAndWrite
from src.log.Log import Log


class SQLalchemyFast(object):
    Base = declarative_base()
    """
    功能: SQLalchemy工具
    """

    def __init__(self, dbFile):
        file = FileReadAndWrite.readPropertiesFile(dbFile)
        self.engine = create_engine(
            url=file['url'],
            echo=bool(file['echo']),
            pool_size=int(file['pool_size']),
            pool_recycle=int(file['pool_recycle']),
            pool_timeout=int(file['pool_timeout']),
            isolation_level=file['isolation_level'],
        )

        SQLalchemyFast.Base.metadata.create_all(self.engine)  # 創建表,如果表存在則不創建(必須對象繼承Base)

    # 創建會話
    def createSession(self):
        Session = sessionmaker(bind=self.engine)
        return Session()

    # 添加一條數據
    def addData(self, object):
        with self.createSession() as conn:
            conn.add(object)
            conn.commit()

    # 添加多條數據
    def addDataList(self, objectList):
        with self.createSession() as conn:
            conn.add_all(objectList)
            conn.commit()

    # 刪除主鍵id的數據
    def deleteDataById(self, cla, id):
        with self.createSession() as conn:
            conn.query(cla).filter(cla.id == id).delete()
            conn.commit()

    # 刪除指定數據(where是並且的關系,不支持or和其他復雜查詢)
    def deleteDataWhere(self, cla, *where):
        with self.createSession() as conn:
            stmt = delete(cla).where(*where)
            conn.execute(stmt)
            conn.commit()

    # 清空表
    def truncateTable(self, cla):
        with self.createSession() as conn:
            conn.query(cla).delete()
            conn.commit()

    # 更新指定主鍵id的數據
    def updateDataById(self, cla, id, data):
        """
        :param cla:  類(表)
        :param id:  主鍵id
        :param data:  {'key': "value",...}  key為表中的字段名,value為要修改的值
        :return:
        """
        with self.createSession() as conn:
            stmt = update(cla).where(cla.id == id).values(data)
            result = conn.execute(stmt)
            conn.commit()
            return result

    # 更新指定條件的數據 (where是並且的關系,不支持or和其他復雜查詢)
    def updateDataWhere(self, cla, data, *where):
        """
        :param cla:  類(表)
        :param data:  {'key': "value",...}  key為表中的字段名,value為要修改的值
        :param where:  過濾條件
        :return:
        """
        with self.createSession() as conn:
            stmt = update(cla).where(*where).values(data)
            conn.execute(stmt)
            conn.commit()

    # 查詢全部數據
    def queryDataAll(self, cla):
        with self.createSession() as conn:
            result = conn.query(cla).all()
        return result

    # 查詢主鍵id的數據
    def queryDataById(self, cla, id):
        with self.createSession() as conn:
            result = conn.query(cla).filter(cla.id == id).first()
        return result

    # 查詢指定數據,不支持分組查詢(因為聚合後的數據無法轉換成對象)
    def queryDataWhere(self, cla,aliasName=None, column=None, where=None,
                       join=None, on=None, left=None, full=None,
                       order="", limit="", offset="", distinct=None, params=None):
        with self.createSession() as conn:
            stmt = select(cla)
            if aliasName:
                stmt = select(alias(cla,aliasName))
            if column:
                stmt = stmt.with_only_columns(text(column)) .select_from(cla)
            if join is not None and on is not None:
                if left:
                    stmt = stmt.join(join, text(on), isouter=True)
                elif full:
                    stmt = stmt.join(join, text(on), full=True)
                else:
                    stmt = stmt.join(join, text(on))
            if where:
                stmt = stmt.where(text(where))
            if order:
                stmt = stmt.order_by(text(order))
            if limit:
                stmt = stmt.limit(limit)
            if offset:
                stmt = stmt.offset(offset)
            if distinct:
                stmt = stmt.distinct()
            result = conn.execute(stmt,params).all()
            result= [row[0] for row in result]
        return result

    # 創建事物(運行多條sql語句 ,function(conn)是一個函數,裡面包含多條sql語句,需要使用原生的sqlalchemy)
    def createTransaction(self, function):
        with  self.createSession() as conn:
            conn.begin()
            try:
                function(conn)
                conn.commit()
            except Exception as e:
                Log.logError(e)
                conn.rollback()

    # 執行sql語句(包括增刪改查,和存儲過程...隻要是sql語句都可以執行)
    def executeSql(self, sql, params=None):
        """
        :param sql:  sql語句  如: "select * from User where id = :id and name = :name "
        :param params:  參數  例如: {"id":1,"name":"張三"} 註意:參數名必須和sql語句中的參數名一致
        發送多個參數時,參數名必須以列表的形式傳入,例如: {"id":["1","2"],"name":"張三"}
        "INSERT INTO some_table (x, y) VALUES (:x, :y)" 參數可以是 [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
        :return:
        """
        with self.createSession() as conn:
            result = conn.execute(text(sql), params)
            conn.commit()
        return result

    # 執行構建sql語句
    def executeSqlBuild(self, sql):
        with self.createSession() as conn:
            result = conn.execute(sql)
            conn.commit()
        return result

測試實體

from sqlalchemy import Column, Integer, String

from src.database.SQLalchemyFast import SQLalchemyFast


class UserDB(SQLalchemyFast.Base):
    __tablename__ = "User"   # __tablename__ 聲明表名
    # primary_key為主鍵,autoincrement為自增, doc 為註釋但是不會在數據庫中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主鍵")
    name = Column(String(64), unique=True, doc="用戶名") # unique=True 為唯一約束會在數據庫中生成索引
    email = Column(String(64), doc="郵箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)
from sqlalchemy import Column, Integer, String

from src.database.SQLalchemyFast import SQLalchemyFast


class BookDB(SQLalchemyFast.Base):
    __tablename__ = "Book"   # __tablename__ 聲明表名
    # primary_key為主鍵,autoincrement為自增, doc 為註釋但是不會在數據庫中生成
    id = Column(Integer, primary_key=True,autoincrement=True,doc="主鍵")
    name = Column(String(64), unique=True, doc="用戶名") # unique=True 為唯一約束會在數據庫中生成索引
    email = Column(String(64), doc="郵箱")
    def __init__(self, name=None, email=None):
        self.name = name
        self.email = email

    def __str__(self):
           return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)

驗證代碼

import unittest
from sqlalchemy import delete, update, text, select, join, desc, asc, alias

from src.database.BookDB import BookDB
from src.database.SQLalchemyFast import SQLalchemyFast
from src.database.UserDB import UserDB
from src.file.FileTool import FileTool


class SQLalchemyFastTest(unittest.TestCase):
    # 測試添加數據
    def test_add(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.addData(UserDB("name1", "123456789"))
        db.addData(UserDB("name2", "123456789"))
        db.addData(UserDB("name3", "123456789"))
        db.addData(UserDB("name4", "123456789"))

    # 測試添加多條數據
    def test_addAll(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.addDataList([UserDB("name111", "123456789"), UserDB("name211", "123456789")])
    # 測試刪除數據
    def test_deleteDataById(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.deleteDataById(UserDB, 1)

    # 測試條件刪除數據
    def test_deleteWhere(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.deleteDataWhere(UserDB, UserDB.name == "name1", UserDB.email == "123456789")

    # 測試更新數據
    def test_update(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.updateDataById(UserDB, 10, {"name": "name31", "email": "123456789"})

    # 測試條件更新數據
    def test_updateFilter(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        db.updateDataWhere(UserDB, {"name": "name33", "email": "123456789"}, UserDB.name == "name2",
                           UserDB.email == "1231")

    # 測試查詢數據
    def test_queryDataAll(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data_all = db.queryDataAll(UserDB)
        for data in data_all:
            print(data)

    # 測試查詢指定id數據
    def test_queryDataById(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data = db.queryDataById(UserDB, 10)
        print(data)

    # 測試條件查詢數據(不支持分組查詢和鏈表查詢)
    def test_queryDataWhere(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        data_all = db.queryDataWhere(UserDB,
                                     where="name like CONCAT(:name,'%')",
                                     order="id desc",
                                     offset=1,
                                     limit=3,
                                     params={"name": "name"})

        # db.queryDataWhere(UserDB,
        #                              where="name like CONCAT(:name,'%')",
        #                              order="id desc",
        #                              offset=1,
        #                              limit=3,
        #                              params={"name": "name"})

        # db.queryDataWhere(UserDB,aliasName="a",
        #                   join=alias(BookDB,"b"),on="a.id == b.id",
        #                   where="a.name like CONCAT(:name,'%')",
        #                   params={"name": "name"})
        for data in data_all:
            print(data)

    # 測試創建事物
    def test_createTransaction(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)

        def test1(conn):
            conn.add(UserDB("name111", "123456789"))
            conn.add(UserDB("name211", "123456789"))
            # raise Exception("test122")
            # conn.add(UserDB("name333", "123456789"))
            # conn.add(UserDB("name444", "123456789"))

        db.createTransaction(test1)

    # 測試執行sql(執行失敗會回滾的(存儲過程,函數))
    def test_executeSql(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        # data_all = db.executeSql("select * from User")
        # data_all = db.executeSql("select * from User where name like CONCAT(:name,'%')", params={"name":"name"})
        # for data in data_all:
        #     print(data)

        # 創建存儲過程
        # db.executeSql("CREATE PROCEDURE `test_procedure` \
        # (IN `in_name` VARCHAR(255), IN `in_email` VARCHAR(255)) \
        # BEGIN \
        # INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        # END")

        # 調用存儲過程
        # db.executeSql("call test_procedure(:name, :email)", params={"name": "name555", "email": "email12131"})

        # 創建函數
        # db.executeSql("CREATE FUNCTION `test_function`( `in_name` VARCHAR(255),  `in_email` VARCHAR(255)) \
        #  RETURNS INT(11) \
        #  BEGIN \
        #  DELETE FROM `User`; \
        #  INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        #  INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
        #  RETURN 1; \
        #  END")
        # 調用函數
        # data_all = db.executeSql("select test_function(:name, :email)", params={"name": "name5551", "email": "email12131"})



    # 測試sql構造
    def test_executeSqlBuild(self):
        path = FileTool.getProjectPath(subpath="src/database/database.properties")
        db = SQLalchemyFast(path)
        # sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
        #     join(BookDB,UserDB.id == BookDB.id)
        # print(sql)
        # db.executeSqlBuild(sql)

以上就是Python ORM數據庫框架Sqlalchemy的使用教程詳解的詳細內容,更多關於Python Sqlalchemy的資料請關註WalkonNet其它相關文章!

推薦閱讀: