1. 介绍 ORM
框架将数据库中的表(表结构)映射为面向对象的类(对象),将表中的记录(行)映射为类的实例(对象的实例),将表中的字段(列)映射为类的属性(对象的属性)。通过ORM
框架,可以直接使用面向对象的方式来进行数据库操作,比如增删改查等,而不必编写复杂的SQL
语句。
ORM
框架的主要作用包括:
简化数据库操作 :ORM
框架封装了底层数据库的操作细节,提供了高级的对象操作接口,使得数据库操作更加简单和直观。
提高开发效率 :ORM
框架提供了自动化的数据库映射和对象关系管理,减少了开发人员对数据库操作的工作量,提高了开发效率。
提高代码的可维护性 :通过ORM
框架,可以将数据模型和业务逻辑解耦,使得代码更加模块化和易于维护。
提高跨数据库的兼容性 :ORM
框架通常具有良好的跨数据库兼容性,可以轻松地切换不同的数据库,而不必修改大量的代码。
防止SQL
注入攻击 :ORM
框架通常会对用户输入的数据进行参数化处理,从而有效地防止了SQL
注入攻击。
2. 依赖安装 2.1 安装sqlalchemy
@注意: 虽然sqlalchemy
已经升级到2.0, 但发现自动生成模型工具sqlacodegen
还是基于sqlalchemy 1.4
生成代码,所以这里仍然使用sqlalchemy 1.4
版本 。
# 安装 $ python-learn pip install sqlalchemy==1.4.51 ... Installing collected packages: sqlalchemy Successfully installed sqlalchemy-1.4.51
2.2 安装模型生成器 $ pip install sqlacodegen==3.0.0rc3
3.生成model 3.1 编写脚本 文件: bin/genmodels.sh
#!/bin/bash if [ -z "$1 " ]; then echo -e " 使用说明: $0 connect db_type [connect示例]: mysql: mysql+pymysql://用户名:密码@127.0.0.1:3306/数据库名 postgresql: postgresql://username:password@localhost:5432/database_name mongodb: mongodb://username:password@localhost:27017/database_name [db_type示例]: mysql、postgresql、mongodb " exit 1fi db_type=$(echo "$2 " | awk -F: '{print $1}' ) model_path="app/dao/models/" echo "db_type: $db_type " output_file="$2 " case "$db_type " in mysql) output_file="${output_file} _gen.py" ;; postgresql) output_file="${output_file} _gen.py" ;; mongodb) output_file="${output_file} _gen.py" ;; *) echo "数据库类型只能是[mysql/postgresql/mongodb] database type: $db_type " exit 1 ;;esac sqlacodegen "$1 " > "${model_path} $output_file " echo "Generated models file: $output_file "
3.2 运行脚本 $ chmod 777 bin/genmodels.sh $ bash bin/genmodels.sh 使用说明: bin/genmodels.sh connect db_type [connect示例]: mysql: mysql+pymysql://用户名:密码@127.0.0.1:3306/数据库名 postgresql: postgresql://username:password@localhost:5432/database_name mongodb: mongodb://username:password@localhost:27017/database_name [db_type示例]: mysql、postgresql、mongodb
3.3 生成model
@注意: 关于本次测试使用的库SQL文件在目录: static/sql/user.sql
$ bash bin/genmodels.sh mysql+pymysql://root:root@127.0.0.1:3306/test mysql db_type: mysql Generated models file: mysql_gen.py
运行上述命令后,会把数据库(test
)中所有的表,生成对应的model
,存到文件:app/dao/models/mysql_gen.py
中。
4. 封装集成 4.1 添加配置 文件: .env
DB_DSN=mysql+pymysql://root:root@127.0.0.1:3306/test DB_ECHO_SQL=True DB_POOL_SIZE=5 DB_MAX_OVERFLOW=10
4.2 封装会话 文件: app/dao/base_dao.py
from sqlalchemy import create_enginefrom app.config import globalAppSettingsfrom sqlalchemy.orm import sessionmakerfrom contextlib import contextmanager engine = create_engine( globalAppSettings.db_dsn, echo=globalAppSettings.db_echo_sql, pool_size=globalAppSettings.db_pool_size, max_overflow=globalAppSettings.db_max_overflow, ) Session = sessionmaker(bind=engine, expire_on_commit=False )@contextmanager def getDatabaseSession (autoCommitByExit=True ): """使用上下文管理资源关闭""" _session = Session() try : yield _session if autoCommitByExit: _session.commit() except Exception as e: _session.rollback() raise e
@注意:使用sessionmaker需要设置属性expire_on_commit=False,否则会出现报错:Instance is not bound to a Session
4.3 封装dao 文件: app/dao/user_dao.py
from sqlalchemy import descfrom .base_dao import getDatabaseSessionfrom app.dao.models import YmUserclass UserQueryDao (object ): """用户查询类dao""" @classmethod def findByPhone (cls, phone: str ) -> YmUser: """单条查询示例""" with getDatabaseSession() as session: query = session.query(YmUser).filter (YmUser.phone == phone) result = query.first() return result @classmethod def findByPage ( cls, page: int = 1 , pageSize: int = 10 , **kwargs ) -> (int , list [YmUser]): """分页查询示例""" with getDatabaseSession() as session: query = session.query(YmUser) for column, value in kwargs.items(): if not hasattr (YmUser, column): continue if isinstance (value, tuple ): query = query.filter (getattr (YmUser, column).between(*value)) elif isinstance (value, list ): query = query.filter (getattr (YmUser, column).in_(value)) elif isinstance (value, str ) and value.find("%" ) != -1 : query = query.filter (getattr (YmUser, column).like(value)) else : query = query.filter (getattr (YmUser, column) == value) total = query.count() offset = (page - 1 ) * pageSize query = query.order_by(desc(YmUser.id )).offset(offset).limit(pageSize) result = query.all () return total, resultclass UserOperateDao (object ): """操作用户相关dao""" @classmethod def saveUser (cls, user: YmUser ) -> YmUser: """添加单条""" with getDatabaseSession(False ) as session: session.add(user) session.commit() session.refresh(user) return user @classmethod def saveUserList (cls, users: list [YmUser] ): """添加单条""" with getDatabaseSession() as session: session.bulk_save_objects(users) return
4.4 单元测试 文件: tests/user_query_dao_test.py
在之前的文章: Python库学习(十四):ORM框架-SQLAlchemy: https://mp.weixin.qq.com/s/y9FTKYl_Kf6opNgddUWa1g 学过SQLAlchemy
的一些基本使用,这里只做简单示例演示
import unittestfrom datetime import datetimefrom app import daofrom app.dao import modelsclass UserDaoTestCase (unittest.TestCase): def test_findByPhone (self ): """单条查询测试""" result = dao.UserQueryDao.findByPhone("17408049453" ) self.assertNotEqual(result.id , 0 ) def test_findByPage (self ): """分页查询测试""" total, result = dao.UserQueryDao.findByPage( 1 , 10 , id =(10 , 30 ), phone=["17804116371" , "17350624789" , "17654732912" , "17545435626" ], nick_name="%雨%" , ) self.assertNotEqual(len (result), 0 ) def test_saveUser (self ): """单条查询测试""" result = dao.UserOperateDao.saveUser( models.YmUser( union_id="ui_12344343434" , open_id="op_ksjdhjjkdhdjdhh" , nick_name="娃哈哈" , password="123456" , email="test@163.com" , phone="17600000000" , last_login=datetime.now().strftime("%Y-%m-%d %H:%M:%S" ), avatar="http://img-avatar.com/head-abc.jpg" , ) ) print (result.id ) self.assertNotEqual(result.id , 0 )if __name__ == "__main__" : unittest.main()
5. 业务实例 5.1 请求流程简述 现在要实现一个简单的用户列表查询接口,下面是整个业务的简图:
5.2 定义控制器 文件: app/controller/user_router.py
from fastapi import APIRouterfrom app.types import apiprotofrom app.service import usersvcfrom app import utils router = APIRouter(prefix="/user" , tags=["用户相关接口" ])@router.post("/list" ) async def userList (param: apiproto.UserListRequest ) -> utils.HttpResponse: """ 用户列表-演示 """ data = usersvc.UserListService.getUserList(param) return utils.ResponseSuccess(data)
@注意: 此处为了节省篇幅,忽略注册控制到框架的代码,具体使用可参考之前的代码。
5.3 编写servcie 文件: app/service/usersvc/user_list_svc.py
from typing import List from app.types import apiprotofrom app import daoclass UserListService : """用户列表""" @classmethod def getUserList (cls, queryParam: apiproto.UserListRequest ) -> apiproto.UserListResponse: """查询用户列表""" queryDict = {} if queryParam.nick_name != "" : queryDict["nick_name" ] = f"%{queryParam.nick_name} %" if queryParam.phone != "" : queryDict["phone" ] = queryParam.phone total, result = dao.UserQueryDao.findByPage( queryParam.page, queryParam.pageSize, **queryDict ) if total == 0 : return apiproto.UserListResponse() records_list: List [apiproto.UserDetailProto] = [] for record in result: tmp = apiproto.UserDetailProto( id =record.id , union_id=record.union_id, open_id=record.open_id, nick_name=record.nick_name, avatar=record.avatar, phone=record.phone, email=record.email, last_login=record.last_login, status=record.status, delete_at=record.delete_at, created_at=str (record.created_at), updated_at=str (record.updated_at), ) records_list.append(tmp) return apiproto.UserListResponse(record_total=total, record_list=records_list)
5.4 请求验证 5.4.1 发起请求 curl -X 'POST' \ 'http://0.0.0.0:8088/api/user/list' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "nick_name": "辉", "phone": "", "page": 1, "pageSize": 10 }'
5.4.2 响应结果 { "code" : 200 , "msg" : "处理成功" , "data" : { "record_total" : 128 , "record_list" : [ { "id" : 9866 , "union_id" : "ui_d6YXU4Ie3yaHnDiF7dRrl2iRWnB1Cz9gshxPkDb9xEf5f12j9X" , "open_id" : "op_KQyeQOV1b0nVv823PP4HZvsM3XBeOdq8hPiy8aK6AYcBW9aLss" , "nick_name" : "传辉宇彬胜" , "avatar" : "http://img-avatar.com/head-DveZAsPp.jpg" , "phone" : "17154750068" , "email" : "DveZAsPp@163.com" , "last_login" : "2023-12-01 11:52:28" , "status" : 1 , "delete_at" : "" , "created_at" : "2024-01-04 11:52:33" , "updated_at" : "2024-01-04 11:52:33" } , { "id" : 9711 , "union_id" : "ui_Nz2BaajGh20rUIBG23nUZT51wIxHdZbUsZMaUgPViiz11r8POn" , "open_id" : "op_lJxnCcCaj2i5wMqsGy2AE1Iqd1fRdZ0Ngv9xFuVuMBAHbwUoKc" , "nick_name" : "责た精萬辉" , "avatar" : "http://img-avatar.com/head-WOOl4Bj4.jpg" , "phone" : "15928359654" , "email" : "WOOl4Bj4@163.com" , "last_login" : "2023-11-29 11:52:27" , "status" : 1 , "delete_at" : "" , "created_at" : "2024-01-04 11:52:33" , "updated_at" : "2024-01-04 11:52:33" } , 省略... ] } , "additional" : { "time" : "2024-05-10 19:01:05" , "trace_id" : "b3d74e53872c1596daf67daa379071a1" } }