1. 介绍

ORM框架将数据库中的表(表结构)映射为面向对象的类(对象),将表中的记录(行)映射为类的实例(对象的实例),将表中的字段(列)映射为类的属性(对象的属性)。通过ORM框架,可以直接使用面向对象的方式来进行数据库操作,比如增删改查等,而不必编写复杂的SQL语句。

ORM框架的主要作用包括:

  1. 简化数据库操作ORM 框架封装了底层数据库的操作细节,提供了高级的对象操作接口,使得数据库操作更加简单和直观。
  2. 提高开发效率ORM 框架提供了自动化的数据库映射和对象关系管理,减少了开发人员对数据库操作的工作量,提高了开发效率。
  3. 提高代码的可维护性:通过ORM框架,可以将数据模型和业务逻辑解耦,使得代码更加模块化和易于维护。
  4. 提高跨数据库的兼容性ORM 框架通常具有良好的跨数据库兼容性,可以轻松地切换不同的数据库,而不必修改大量的代码。
  5. 防止SQL注入攻击ORM框架通常会对用户输入的数据进行参数化处理,从而有效地防止了SQL注入攻击。

2. 依赖安装

2.1 安装sqlalchemy

@注意: 虽然sqlalchemy已经升级到2.0, 但发现自动生成模型工具sqlacodegen还是基于sqlalchemy 1.4生成代码,所以这里仍然使用sqlalchemy 1.4版本 。

# 安装
$ python-learn pip install sqlalchemy==1.4.51
...
Installing collected packages: sqlalchemy
Successfully installed sqlalchemy-1.4.51

2.2 安装模型生成器

# 这里指定版本安装,本人体验的是最新版本
$ pip install sqlacodegen==3.0.0rc3

3.生成model

3.1 编写脚本

文件: bin/genmodels.sh

#!/bin/bash
# 判断参数是否为空
if [ -z "$1" ]; then
echo -e " 使用说明: $0 connect db_type
[connect示例]:
mysql: mysql+pymysql://用户名:密码@127.0.0.1:3306/数据库名
postgresql: postgresql://username:password@localhost:5432/database_name
mongodb: mongodb://username:password@localhost:27017/database_name
[db_type示例]:
mysql、postgresql、mongodb
"
exit 1
fi

# 提取数据库类型
db_type=$(echo "$2" | awk -F: '{print $1}')
# 模型文件目录
model_path="app/dao/models/"

echo "db_type: $db_type"
# 生成模型文件名
output_file="$2"
case "$db_type" in
mysql)
output_file="${output_file}_gen.py"
;;
postgresql)
output_file="${output_file}_gen.py"
;;
mongodb)
output_file="${output_file}_gen.py"
;;
*)
echo "数据库类型只能是[mysql/postgresql/mongodb] database type: $db_type"
exit 1
;;
esac

# 使用 sqlacodegen 生成模型文件
sqlacodegen "$1" > "${model_path}$output_file"
echo "Generated models file: $output_file"

3.2 运行脚本

# 赋执行权限
$ chmod 777 bin/genmodels.sh

# 运行
$ bash bin/genmodels.sh
使用说明: bin/genmodels.sh connect db_type
[connect示例]:
mysql: mysql+pymysql://用户名:密码@127.0.0.1:3306/数据库名
postgresql: postgresql://username:password@localhost:5432/database_name
mongodb: mongodb://username:password@localhost:27017/database_name
[db_type示例]:
mysql、postgresql、mongodb

3.3 生成model

@注意: 关于本次测试使用的库SQL文件在目录: static/sql/user.sql

# 执行脚本
$ bash bin/genmodels.sh mysql+pymysql://root:root@127.0.0.1:3306/test mysql
db_type: mysql
Generated models file: mysql_gen.py

运行上述命令后,会把数据库(test)中所有的表,生成对应的model,存到文件:app/dao/models/mysql_gen.py中。

4. 封装集成

4.1 添加配置

文件: .env

# -------- 数据库配置 --------
DB_DSN=mysql+pymysql://root:root@127.0.0.1:3306/test # 数据库连接
DB_ECHO_SQL=True # 使用打印SQL日志信息
DB_POOL_SIZE=5 # 连接池中的初始连接数,默认为 5
DB_MAX_OVERFLOW=10 # 连接池中允许的最大超出连接数

4.2 封装会话

文件: app/dao/base_dao.py

from sqlalchemy import create_engine
from app.config import globalAppSettings
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

# 创建引擎
engine = create_engine(
globalAppSettings.db_dsn,
echo=globalAppSettings.db_echo_sql, # 是否打印SQL
pool_size=globalAppSettings.db_pool_size, # 连接池的大小,指定同时在连接池中保持的数据库连接数,默认:5
max_overflow=globalAppSettings.db_max_overflow, # 超出连接池大小的连接数,超过这个数量的连接将被丢弃,默认: 5
)

# 封装获取会话
Session = sessionmaker(bind=engine, expire_on_commit=False)


@contextmanager
def getDatabaseSession(autoCommitByExit=True):
"""使用上下文管理资源关闭"""
_session = Session()
try:
yield _session
# 退出时,是否自动提交
if autoCommitByExit:
_session.commit()
except Exception as e:
_session.rollback()
raise e

@注意:使用sessionmaker需要设置属性expire_on_commit=False,否则会出现报错:Instance is not bound to a Session  

4.3 封装dao

文件: app/dao/user_dao.py


from sqlalchemy import desc
from .base_dao import getDatabaseSession
from app.dao.models import YmUser


class UserQueryDao(object):
"""用户查询类dao"""

@classmethod
def findByPhone(cls, phone: str) -> YmUser:
"""单条查询示例"""
with getDatabaseSession() as session:
query = session.query(YmUser).filter(YmUser.phone == phone)
result = query.first()
return result

@classmethod
def findByPage(
cls, page: int = 1, pageSize: int = 10, **kwargs
) -> (int, list[YmUser]):
"""分页查询示例"""
with getDatabaseSession() as session:
query = session.query(YmUser)
# 填充具体查询条件
for column, value in kwargs.items():
if not hasattr(YmUser, column):
continue

# 根据值类型,来组装查询条件
if isinstance(value, tuple):
# 范围查询
query = query.filter(getattr(YmUser, column).between(*value))
elif isinstance(value, list):
# in查询
query = query.filter(getattr(YmUser, column).in_(value))
elif isinstance(value, str) and value.find("%") != -1:
# 模糊查询
query = query.filter(getattr(YmUser, column).like(value))
else:
# 等值查询
query = query.filter(getattr(YmUser, column) == value)

# 查询总条数
total = query.count()
# 排序分页
offset = (page - 1) * pageSize
query = query.order_by(desc(YmUser.id)).offset(offset).limit(pageSize)
# 查询记录
result = query.all()

return total, result


class UserOperateDao(object):
"""操作用户相关dao"""

@classmethod
def saveUser(cls, user: YmUser) -> YmUser:
"""添加单条"""
with getDatabaseSession(False) as session:
session.add(user)
session.commit()
session.refresh(user)
return user

@classmethod
def saveUserList(cls, users: list[YmUser]):
"""添加单条"""
with getDatabaseSession() as session:
session.bulk_save_objects(users)
return

4.4 单元测试

文件: tests/user_query_dao_test.py

在之前的文章: Python库学习(十四):ORM框架-SQLAlchemy: https://mp.weixin.qq.com/s/y9FTKYl_Kf6opNgddUWa1g 学过SQLAlchemy的一些基本使用,这里只做简单示例演示

import unittest
from datetime import datetime

from app import dao
from app.dao import models


class UserDaoTestCase(unittest.TestCase):
def test_findByPhone(self):
"""单条查询测试"""
result = dao.UserQueryDao.findByPhone("17408049453")
self.assertNotEqual(result.id, 0)

def test_findByPage(self):
"""分页查询测试"""
# 10, age=30, gender='male', height=(160, 180), city=['New York', 'Los Angeles']
total, result = dao.UserQueryDao.findByPage(
1,
10,
id=(10, 30),
phone=["17804116371", "17350624789", "17654732912", "17545435626"],
nick_name="%雨%",
)
self.assertNotEqual(len(result), 0)

def test_saveUser(self):
"""单条查询测试"""
result = dao.UserOperateDao.saveUser(
models.YmUser(
union_id="ui_12344343434",
open_id="op_ksjdhjjkdhdjdhh",
nick_name="娃哈哈",
password="123456",
email="test@163.com",
phone="17600000000",
last_login=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
avatar="http://img-avatar.com/head-abc.jpg",
)
)
print(result.id)
self.assertNotEqual(result.id, 0)


if __name__ == "__main__":
unittest.main()

5. 业务实例

5.1 请求流程简述

现在要实现一个简单的用户列表查询接口,下面是整个业务的简图:

image-20240510154215412

5.2 定义控制器

文件: app/controller/user_router.py

from fastapi import APIRouter
from app.types import apiproto
from app.service import usersvc
from app import utils

router = APIRouter(prefix="/user", tags=["用户相关接口"])

@router.post("/list")
async def userList(param: apiproto.UserListRequest) -> utils.HttpResponse:
"""
用户列表-演示
"""
data = usersvc.UserListService.getUserList(param)
return utils.ResponseSuccess(data)

@注意: 此处为了节省篇幅,忽略注册控制到框架的代码,具体使用可参考之前的代码。

5.3 编写servcie

文件: app/service/usersvc/user_list_svc.py

from typing import List
from app.types import apiproto
from app import dao


class UserListService:
"""用户列表"""

@classmethod
def getUserList(cls, queryParam: apiproto.UserListRequest) -> apiproto.UserListResponse:
"""查询用户列表"""

# 拼凑查询信息
queryDict = {}
if queryParam.nick_name != "":
queryDict["nick_name"] = f"%{queryParam.nick_name}%"
if queryParam.phone != "":
queryDict["phone"] = queryParam.phone

total, result = dao.UserQueryDao.findByPage(
queryParam.page,
queryParam.pageSize,
**queryDict
)
if total == 0:
return apiproto.UserListResponse()

# 格式化数据
records_list: List[apiproto.UserDetailProto] = []

for record in result:
tmp = apiproto.UserDetailProto(
id=record.id,
union_id=record.union_id,
open_id=record.open_id,
nick_name=record.nick_name,
avatar=record.avatar,
phone=record.phone,
email=record.email,
last_login=record.last_login,
status=record.status,
delete_at=record.delete_at,
created_at=str(record.created_at),
updated_at=str(record.updated_at),
)
records_list.append(tmp)

return apiproto.UserListResponse(record_total=total, record_list=records_list)

5.4 请求验证

5.4.1 发起请求

curl -X 'POST' \
'http://0.0.0.0:8088/api/user/list' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"nick_name": "辉",
"phone": "",
"page": 1,
"pageSize": 10
}'

5.4.2 响应结果

{
"code": 200,
"msg": "处理成功",
"data": {
"record_total": 128,
"record_list": [
{
"id": 9866,
"union_id": "ui_d6YXU4Ie3yaHnDiF7dRrl2iRWnB1Cz9gshxPkDb9xEf5f12j9X",
"open_id": "op_KQyeQOV1b0nVv823PP4HZvsM3XBeOdq8hPiy8aK6AYcBW9aLss",
"nick_name": "传辉宇彬胜",
"avatar": "http://img-avatar.com/head-DveZAsPp.jpg",
"phone": "17154750068",
"email": "DveZAsPp@163.com",
"last_login": "2023-12-01 11:52:28",
"status": 1,
"delete_at": "",
"created_at": "2024-01-04 11:52:33",
"updated_at": "2024-01-04 11:52:33"
},
{
"id": 9711,
"union_id": "ui_Nz2BaajGh20rUIBG23nUZT51wIxHdZbUsZMaUgPViiz11r8POn",
"open_id": "op_lJxnCcCaj2i5wMqsGy2AE1Iqd1fRdZ0Ngv9xFuVuMBAHbwUoKc",
"nick_name": "责た精萬辉",
"avatar": "http://img-avatar.com/head-WOOl4Bj4.jpg",
"phone": "15928359654",
"email": "WOOl4Bj4@163.com",
"last_login": "2023-11-29 11:52:27",
"status": 1,
"delete_at": "",
"created_at": "2024-01-04 11:52:33",
"updated_at": "2024-01-04 11:52:33"
},
省略...
]
},
"additional": {
"time": "2024-05-10 19:01:05",
"trace_id": "b3d74e53872c1596daf67daa379071a1"
}
}