SQLAlchemy 的配置链接文件
以下为SQLAlchemy的配置文件,请勿直接复制,建议手敲,毕竟网页复制会使得代码变形。
Python可是非常注意代码的格式的!
from sqlalchemy import Column,create_engine,MetaData,Table,func,event,exc
from sqlalchemy.orm import sessionmaker,scoped_session
from sqlalchemy.types import *
from sqlalchemy.ext.declarative import declarative_base
from urllib.parse import quote_plus
import os
# 数据库配置
dbConfig = {
"dbname":"数据库名",
"dbusername":"数据库用户名",
"dbpasswd":"数据库密码",
"dbhost":"数据库地址",
"dbport":3306,
"dbcharset":"数据库编码格式"
}
base = declarative_base()
engine = create_engine(
# 形成原则
# mysql+pymysql://数据库用户名:数据库密码@服务器地址:端口号/数据库名?charset=字符集
# 如果密码中带了@ 这个特殊字符,需要将quote_plus 这个方法包裹字符串 或者
"mysql+pymysql://fastapiblog:"+quote_plus('Abcd@123456')+"@localhost:3306/fastapi?charset=utf8mb4",
# 数据库连接池,默认为5,建议调整是调整到CPU内核数x2,因为调整到太大
pool_size=10,
# 这个入参是标识是否打印SQL建议在开发环境开启。
echo=True,
# MySQL 在默认情况下链接8小时没有任何操作,会自动断开链接,此后会出现MySQL has gone away的错误。
# 因此需要设置这个参数来回收链接。这个链接的单位是s也就是设置3000s之后,将会被sqlalchemy给回收。
pool_recycle=3000,
# 连接池ping,当从连接池拿链接的时候,会向数据库发送一个select 1(类似)的操作以确定是否数据库是否存活。
pool_pre_ping=True
)
metadata = MetaData(engine)
import datetime
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):
connection_record.info['pid'] = os.getpid()
@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):
pid = os.getpid()
if connection_record.info['pid'] != pid:
connection_record.connection = connection_proxy.connection = None
raise exc.DisconnectionError(
"Connection record belongs to pid %s, "
"attempting to check out in pid %s" %
(connection_record.info['pid'], pid)
)
'''
请将表建在下方
'''
# 固定形式需要继承base 否则sqlalchemy 是无法执行查询等操作的
class UserTable(base):
# 固定形式,必须在这里定义表名,否则无法查询到表
__tablename__ = "user"
# 定义int结构的主键,自增
id = Column(Integer,primary_key=True,autoincrement="auto")
create_time = Column(DATETIME)
update_time = Column(DATETIME)
username = Column(String(64))
password = Column(String(64))
mobile = Column(String(20))
email = Column(String(64))
status = Column(String(16))
'''
请将表建在上方
'''
SessionType = scoped_session(sessionmaker(bind=engine, expire_on_commit=False))
def GetSession():
return SessionType()
from contextlib import contextmanager
@contextmanager
def session_scope():
session = GetSession()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
# 检查数据库是否存在该表。
def check_table_exist(tablename):
with session_scope() as sess:
r = sess.execute(
"select TABLE_NAME from INFORMATION_SCHEMA.TABLES where TABLE_NAME='" + tablename + "' AND TABLE_SCHEMA='"+dbConfig["dbname"]+"';")
if len(r.fetchall()) >= 1:
return True
else:
return False
if __name__ == "__main__":
print(quote_plus('Abcd@123456'))
# 这个方法能够创建所有继承了base的表。
base.metadata.create_all(engine)