Flask-Sqlalchemy的基本使用詳解

一: 基本使用:

1:環境的安裝:

pip install flask-sqlalchemy
pip install pymysql

2:組件初始化:

2.1: 基本的配置

1: 首先先安裝兩個依賴的包。
2:配置數據庫的連接:app.config[‘SQLALCHEMY_DATABASE_URI’] = “mysql://root:[email protected]:3306/test39”
3:關閉數據庫的跟蹤:app.config[‘SQLALCHEMY_TRACK_MODIFICATIONS’] = False
4:開啟輸出sql語句:app.config[‘SQLALCHEMY_ECHO’] = True
5:兩種處理python2和python3的名字不一致問題。

from flask import Flask
from flask_restful import Api, Resource
from flask_sqlalchemy import SQLAlchemy

import pymysql
pymysql.install_as_MySQLdb()

"""
python2中數據庫客戶端: MySqldb
python3中數據庫客戶端:pymysql
解決方案一:讓python2和python3的包進行轉換。
import pymysql
pymysql.install_as_MySQLdb()

方案二:表示隻使用python3的包,不使用python2的包
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://root:[email protected]:3306/test39"
"""

app = Flask(__name__)
db = SQLAlchemy(app)

# app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://賬號:密碼@數據庫ip地址:端口號/數據庫名"
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:[email protected]:3306/test39"
# app.config['SQLALCHEMY_BINDS'] = {}

# 關閉數據庫修改跟蹤操作[提高性能]:
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# 開啟輸出底層執行的sql語句
app.config['SQLALCHEMY_ECHO'] = True

# 開啟數據庫的自動提交功能[一般不使用]
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True

@app.route('/')
def hello_word():
    return "hello, word"

if __name__ == '__main__':
    print(app.url_map)
    app.run(host='0.0.0.0', port= 8000, debug=True)

2.2:結合工廠方法進行配置:

1: 數據庫配置信息存放在環境類中加載。
2:由於數據庫對象和app對象不一定誰先創建,所以可以先創建數據庫對象,等app對象創建之後再進行關聯。
3:進行關聯的函數是:數據庫對象調用自己的init_app()方法。需要傳入app對象。

settings中配置:

# 開發環境
class DevelopmentConfig(BaseConfig):
    """開發環境配置類"""
    DEBUG = True
    # SQL數據庫連接信息
    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:[email protected]:3306/test39"
    # 關閉數據庫修改跟蹤操作 【提高性能】
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    # 開啟輸出底層執行sql語句
    SQLALCHEMY_ECHO = True

主模塊:

from flask import Flask, make_response, Response, request, current_app
from settings import config_dict
from flask_sqlalchemy import SQLAlchemy

# 延後加載
# 創建瞭數據庫,此時數據庫對象還沒有跟app關聯
db = SQLAlchemy()

# 定義一個工廠方法:
def create_app(config_name):
    app = Flask(__name__)

    config_class = config_dict[config_name]
    app.config.from_object(config_class)

    app.config.from_envvar('CONFIG', silent=True)

    # 懶加載
    db.init_app(app)
    return app

app = create_app("dev")

@app.route('/login')
def login():
    return ""

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=True)

3:構建模型類:

  • 模型類必須繼承 db.Model, 其中 db 指對應的組件對象
  • 表名默認為類名小寫, 可以通過 __tablename__類屬性 進行修改
  • 類屬性對應字段, 必須是通過 db.Column() 創建的對象
  • 可以通過 create_all() 和 drop_all()方法 來創建和刪除所有模型類對應的表
  • 註意點: 如果沒有給對應字段的類屬性設置default參數, 且添加數據時也沒有給該字段賦值, 則 sqlalchemy會給該字段設置默認值 None

<模型類創建案例>

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 相關配置
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:[email protected]:3306/test39"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True

# 創建組件對象
db = SQLAlchemy(app)


# 構建模型類  類->表  類屬性->字段  實例對象->記錄
class User(db.Model):
    __tablename__ = 't_user'  # 設置表名, 表名默認為類名小寫
    id = db.Column(db.Integer, primary_key=True)  # 設置主鍵, 默認自增
    name = db.Column('username', db.String(20), unique=True)  # 設置字段名 和 唯一約束
    age = db.Column(db.Integer, default=10, index=True)  # 設置默認值約束 和 索引


if __name__ == '__main__':
    # 刪除所有繼承自db.Model的表
    db.drop_all()
    # 創建所有繼承自db.Model的表
    db.create_all()
    app.run(host="0.0.0.0", port=8000, debug=True)

二:數據操作:

1:增加數據:

1:給模型對象設置數據 可以通過 初始化參數 或者 賦值屬性 兩種方式
2:session.add(模型對象) 添加單條數據到會話中, session.add_all(列表) 添加多條數據到會話中
3:sqlalchemy 會 自動創建事務, 並將數據操作包含在事務中, 提交會話時就會提交事務,事務提交失敗會自動回滾。

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 相關配置
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:[email protected]:3306/test39"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True

# 創建組件對象
db = SQLAlchemy(app)


# 構建模型類
class User(db.Model):
    __tablename__ = 't_user'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column('username', db.String(20), unique=True)
    age = db.Column(db.Integer, index=True)


@app.route('/')
def index():
    """增加數據"""

    # 1.創建模型對象
    user1 = User(name='zs', age=20)
    # user1.name = 'zs'
    # user1.age = 20

    # 2.將模型對象添加到會話中
    db.session.add(user1)
    # 添加多條記錄
    # db.session.add_all([user1, user2, user3])

    # 3.提交會話 (會提交事務)
    # sqlalchemy會自動創建隱式事務
    # 事務失敗會自動回滾
    db.session.commit()

    return "index"


if __name__ == '__main__':
    db.drop_all()
    db.create_all()
    app.run(host="0.0.0.0", port=8000, debug=True)

2:查詢數據:

1:數據的準備工作:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 配置數據庫連接
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:[email protected]:3306/test39"
# 配置取消數據庫跟蹤
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 配置數據庫輸出SQL語句
app.config['SQLALCHEMY_ECHO'] = True
# 創建數據庫對象
db = SQLAlchemy(app)

class User(db.Model):
    # 指定表名:默認使用類名小寫
    __tablename__ = "users"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64))
    email = db.Column(db.String(64))
    age = db.Column(db.Integer)

    def __repr__(self):
        return "(%s, %s, %s, %s)"%(self.id, self.name, self.email, self.age)


if __name__ == '__main__':
    # 刪除所有表
    # db.drop_all()
    # # 創建所有表
    # db.create_all()
    # # 添加測試數據
    # user1 = User(name='wang', email='[email protected]', age=20)
    # user2 = User(name='zhang', email='[email protected]', age=33)
    # user3 = User(name='chen', email='[email protected]', age=23)
    # user4 = User(name='zhou', email='[email protected]', age=29)
    # user5 = User(name='tang', email='[email protected]', age=25)
    # user6 = User(name='wu', email='[email protected]', age=25)
    # user7 = User(name='qian', email='[email protected]', age=23)
    # user8 = User(name='liu', email='[email protected]', age=30)
    # user9 = User(name='li', email='[email protected]', age=28)
    # user10 = User(name='sun', email='[email protected]', age=26)
    #
    # # 一次添加多條數據
    # db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10])
    # db.session.commit()
    app.run(host="0.0.0.0",port=8000, debug=True)

2:進行查詢操作:

	# 1:查詢所有的用戶數據:
    users = User.query.all()
    print(type(users))
    print(users)

    # 2:查詢一共有多少個用戶:
    count = User.query.count()
    print("一共有{}個人".format(count))

    # 3:查詢第一個用戶信息:
    user1 = User.query.first()
    print("第一個用戶的信息是:{}".format(user1))

    # 4:查詢id為4的三種方式:
    #<方案一>:根據id查詢,返回模型類對象
    user4 = User.query.get(4)
    print("第四個用戶的信息是{}".format(user4))

    # <方案二>:等值過濾器 關鍵字實參設置字段值  返回BaseQuery對象
    user4 = User.query.filter_by(id=4).first()
    print("第四個用戶的信息是{}".format(user4))

    # <方案三>:使用復雜過濾器,返回BaseQuery對象
    user4 = User.query.filter(User.id == 4).first()
    print("第四個用戶的信息是{}".format(user4))

    # 5:查詢用戶名字,開始,結尾,包含n的用戶
    user = User.query.filter(User.name.startswith('n')).all()
    print("名字以n開頭的用戶{}".format(user))
    user = User.query.filter(User.name.endswith("n")).all()
    print("名字以n結尾的用戶{}".format(user))
    user = User.query.filter(User.name.contains("n")).all()
    print("名字中包含n的用戶:{}".format(user))
    # 6:查詢名字和郵箱都以li開頭的所有用戶[2種方式]
    users = User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all()
    print("查詢名字和郵箱都以li開頭的所有用戶:{}".format(users))

    users = User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all()
    print("查詢名字和郵箱都以li開頭的所有用戶:{}".format(users))

    # 7:查詢age是25或者email以com結尾的所有用戶
    users = User.query.filter(or_(User.age==25, User.email.endswith('com'))).all()
    print("age是25或者email以com結尾的所有用戶 : {}".format(users))

    # 8: 查詢名字不等於wang的所有用戶
    users = User.query.filter(User.name != "wang").all()
    print("名字不等於wang的所有用戶: {}".format(users))
    users= User.query.filter(not_(User.name=="wang")).all()
    print("名字不等於wang的所有用戶: {}".format(users))

    # 9: 查詢id是[1, 3, 5, 7, 9]的用戶
    users = User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all()
    print("id是[1, 3, 5, 7, 9]的用戶: {}".format(users))

    # 10:所有用戶先按年齡從小到大, 再按id從大到小排序, 取前5個
    users = User.query.order_by(User.age, User.id.desc()).limit(5).all()
    print("所有用戶先按年齡從小到大, 再按id從大到小排序, 取前5個: {}".format(users))
# 11:查詢年齡從小到大第2-5位的數據
    users = User.query.order_by(User.age).offset(1).limit(4).all()
    print("查詢年齡從小到大第2-5位的數據: {}".format(users))

    # 12: 分頁查詢, 每頁3個, 查詢第2頁的數據  paginate(頁碼, 每頁條數)
    pn = User.query.paginate(2, 3)
    print("總頁數是:", pn.pages)
    print("當前頁:", pn.page)
    print("當前頁的數據:", pn.items)
    print("當前頁的總條數", pn.total)

    # 13: 查詢每個年齡段的人數:(分組聚合)
    data = db.session.query(User.age, func.count(User.id)).group_by(User.age).all()
    for item in data:
        print(item[0], item[1])
    # 註意可以給列起別名,但是windows下會報錯,linux下不會報錯。
    # data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all()
    # for item in data:
    #     # print(item[0], item[1])
    #     print(item.age, item.count)  # 建議通過label()方法給字段起別名, 以屬性方式獲取數據

    # 14:隻查詢所有人的姓名和郵箱,這種相當於全表查詢,效率非常低。
    data = db.session.query(User.name, User.email).all()
    for item in data:
        print(item.name, item.email)

    # 15:優化查詢
    data = User.query.options(load_only(User.name, User.email)).all()
    for item in data:
        print(item.name, item.email)
    return "index"

3:修改數據:

1: 推薦采用方案二:
2: 一條語句, 被網絡IO影響程度低, 執行效率更高
3:查詢和更新在一條語句中完成, 單條SQL具有原子性, 不會出現更新丟失問題
4:會對滿足過濾條件的所有記錄進行更新, 可以實現批量更新處理

方案一:先查詢再更新:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 相關配置
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:[email protected]:3306/test39"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True

# 創建組件對象
db = SQLAlchemy(app)


# 構建模型類  商品表
class Goods(db.Model):
    __tablename__ = 't_good'  # 設置表名
    id = db.Column(db.Integer, primary_key=True)  # 設置主鍵
    name = db.Column(db.String(20), unique=True)  # 商品名稱
    count = db.Column(db.Integer)  # 剩餘數量


@app.route('/')
def purchase():
    """購買商品"""

    # 更新方式1: 先查詢後更新
    # 缺點: 並發情況下, 容易出現更新丟失問題 (Lost Update)
    # 1.執行查詢語句, 獲取目標模型對象
    goods = Goods.query.filter(Goods.name == '方便面').first()
    # 2.對模型對象的屬性進行賦值 (更新數據)
    goods.count = goods.count - 1
    # 3.提交會話
    db.session.commit()

    return "index"


if __name__ == '__main__':
    # 刪除所有繼承自db.Model的表
    db.drop_all()
    # 創建所有繼承自db.Model的表
    db.create_all()

    # 添加一條測試數據
    goods = Goods(name='方便面', count=1)
    db.session.add(goods)
    db.session.commit()
    app.run(host="0.0.0.0", port=8000, debug=True)

方案二:配合查詢過濾器filter() 和 更新執行器update() 進行數據更新

  Goods.query.filter(Goods.name == '方便面').update({'count': Goods.count - 1})
    db.session.commit()

4:刪除數據:

方案一:

 # 方式1: 先查後刪除
    goods = Goods.query.filter(Goods.name == '方便面').first()
    # 刪除數據
    db.session.delete(goods)
    # 提交會話 增刪改都要提交會話
    db.session.commit()

方案二:

# 方式2: delete子查詢
    Goods.query.filter(Goods.name == '方便面').delete()
    # 提交會話
    db.session.commit()

三:高級機制:

1:刷新數據:

1:Session 被設計為數據操作的執行者, 會先將操作產生的數據保存到內存中。
2: 在執行 flush刷新操作 後, 數據操作才會同步到數據庫中。
3:隱式刷新操作:1:提交會話 2:查詢操作(包括更新和刪除中的子查詢)。
4:手動刷新:session.flush()

刷新機制的理解:

答:刷新機制就是通過事務,將SQl語句執行一遍,然後將執行結果存儲在變量中,但是數據庫做回滾操作。導致變量中有瞭新值,但是數據庫卻沒有改變。

 	goods = Goods(name='方便面', count=20)
    db.session.add(goods)
    # 主動執行flush操作, 立即執行SQL操作(數據庫同步)
    print(goods.id)  # 此時是None
    db.session.flush()
	print(goods.id) # 此時是1
    # Goods.query.count()  # 查詢操作會自動執行flush操作
    db.session.commit()  # 提交會話會自動執行flush操作

2:多表查詢:

2.1:外鍵關聯查詢:

生成主表對象後,必須刷新數據庫,否則後面無法使用主表對象的屬性。

1:主從表的定義:

# 用戶表  一   一個用戶可以有多個地址
class User(db.Model):
    __tablename__ = 't_user'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(20))


# 地址表   多
class Address(db.Model):
    __tablename__ = 't_adr'
    id = db.Column(db.Integer, primary_key=True)
    detail = db.Column(db.String(20))
    user_id = db.Column(db.Integer)  # 定義外鍵

2:添加關聯數據:

def index():
    """添加並關聯數據"""

    user1 = User(name='張三')
    db.session.add(user1)
    db.session.flush()  # 必須刷新,不然後面的user1.id是None
    adr1 = Address(detail='中關村3號', user_id=user1.id)
    adr2 = Address(detail='華強北5號', user_id=user1.id)
    db.session.add_all([adr1, adr2])
    db.session.commit()
    return "index"

3:關聯查詢:

# 1.先根據姓名查找到主表主鍵
    user1 = User.query.filter_by(name='張三').first()

    # 2.再根據主鍵到從表查詢關聯地址
    adrs = Address.query.filter_by(user_id=user1.id).all()
    for adr in adrs:
        print(adr.detail)

到此這篇關於Flask-Sqlalchemy的基本使用詳解的文章就介紹到這瞭,更多相關Flask-Sqlalchemy 使用內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: