Postgresql的Python数据访问管理类
####################################################################################
## File : dao_postgre.py
## Desc : The dao class to access the postgresql database.
## Author : Li G.Q.
## Date : 2021/9/16/
## Notes : Please keep the copyright information if you copy/modify this file.
#####################################################################################
from logging import exception
from flask import Flask,abort,request,session
from urllib.parse import unquote # For Python 2 use: from urlparse import unquote
from config import db_config
from config import app_config
from core.util import Util
from core.logger import Logger
import logging
import psycopg2
import json
import sys
import traceback
sys.path.append("..")
'''操作Postgresql数据库的类'''
class Postgresql:
'''数据库连接'''
conn = None;
log = None;
debug = app_config.debug;
util = None;
def __init__(self):
'''
构建函数,初始化数据库连接
'''
self.log = Logger(__name__);
self.util = Util();
try:
# 打开数据库连接
self.conn = psycopg2.connect(
host = db_config.postgre_config.host
,user = db_config.postgre_config.user
,password = db_config.postgre_config.password
,database = db_config.postgre_config.database
,port = db_config.postgre_config.port
);
except psycopg2.Error as e:
if self.debug==True:
print(" # Error in postgre init function %d: %s" % (e.args[0], e.args[1]));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
def close(self):
'''
关闭数据库连接
'''
try:
self.conn.close();
except psycopg2.Error as e:
if self.debug==True:
print(" # Error %d: %s" % (e.args[0], e.args[1]));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
def get(self,param):
'''
获取数据表记录,返回结果集
参数:
param:dict型参数,如{'fields':'字段列表','where':'查询条件','limit':分页大小,"page":页号}
返回:
结果集JSON
'''
try:
if("fields" in param.keys()):
if(param["fields"] ==""):
param["fields"] = "*";
else:
param["fields"] = "*";
sql = f'select {param["fields"]} from {param["table"]}';
if("where" in param.keys() and param["where"] and param["where"] !=""):
sql+=" where "+unquote(param["where"]); #decodeURIComponent in js
if("order" in param.keys() and param["order"] and param["order"] !=""):
sql+=" order by " +unquote(param["order"]);
if("limit" in param.keys() and param["limit"] and int(param["limit"])>0):
sql+=f' limit {param["limit"]}';
if("page" in param.keys() and param["page"] and int(param["page"])>=0):
sql+=f' OFFSET '+str(int(param["page"])*int(param["limit"]));
return self.ExecQuery(sql);
except psycopg2.Error as e:
if self.debug==True:
print(" # Error %d: %s" % (e.args[0], e.args[1]));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
def insert(self,param):
'''
添加数据表记录
参数:
param:dict型参数,如{'table':'数据表','字段':字段值}
返回:
None
'''
try:
table = None;
fields = "";
values = "";
for key in param.keys():
if key == 'table':
table = param[key];
else:
if fields!="":
fields+=",";
values+=",";
fields+=key;
if self.util.var_type(param[key]) == "str":
values+="'"+param[key]+"'";
elif self.util.var_type(param[key]) == "int" or self.util.var_type(param[key]) == "float":
values+=str(param[key]);
elif self.util.var_type(param[key]) == "datetime.datetime":
values+="'"+param[key].strftime("%Y-%m-%d %H:%M:%S")+"'";
elif self.util.var_type(param[key]) == "bool":
if param[key] == True:
values+="1";
else:
values+="0";
else:
values+="'"+param[key]+"'";
sql = f'INSERT INTO {table}({fields})values({values})';
print(" # sql: ",sql);
return self.ExecNoQuery(sql);
# except exception as ex:
# self.log.write(logging.ERROR,ex);
# raise Exception(ex);
except psycopg2.Error as e:
if self.debug==True:
print(" # Error %d: %s" % (e.args[0], e.args[1]));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
def delete(self,param):
'''
删除数据表记录
参数:
param:dict型参数,如{'table':'数据表','where':'查询条件'}
返回:
None
'''
try:
sql = f'DELETE from {param["table"]}';
if ("where" in param.keys() and param["where"] and param["where"] !=""):
sql+=" where "+unquote(param["where"]); #decodeURIComponent in js
elif("id" in param.keys()):
sql+= " where id="+param["id"];
return self.ExecNoQuery(sql);
# except exception as ex:
# self.log.write(logging.ERROR,ex);
# raise Exception(ex);
except psycopg2.Error as e:
if self.debug==True:
print(" # Error %d: %s" % (e.args[0], e.args[1]));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
def update(self,param):
'''
更新数据
参数:
param:dict型参数,如{'table':'数据表','字段':字段值},其中必须包括id的值,否则无法更新
返回:
int: 0 - 成功,2003 - 更新有错误
'''
try:
where = "";
if param['table']==None:
return 2003;
sql = f"UPDATE {param['table']} set ";
i=0;
for key in param.keys():
if key == 'table':
table = param[key];
elif(key == 'id'):
where = f'id = {param[key]}';
else:
sql +=(',' if i>0 else '') +f'{key} = ';
if self.util.var_type(param[key]) == "str":
sql +=f"'{param[key]}'";
elif self.util.var_type(param[key]) == "int" or self.util.var_type(param[key]) == "float":
sql += str(param[key]);
elif self.util.var_type(param[key]) == "datetime.datetime":
sql +=f"'{param[key].strftime('%Y-%m-%d %H:%M:%S')}'";
elif self.util.var_type(param[key]) == "bool":
if param[key] == True:
sql += "true";
else:
sql += "false";
else:
sql +="'"+param[key]+"'";
i += 1;
sql += f' where {where}';
print(" # sql: ",sql);
if(self.ExecNoQuery(sql)):
return 0;
except psycopg2.Error as e:
if self.debug==True:
print(" # Error %d: %s" % (e.args[0], e.args[1]));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
def ExecQuery(self, sql):
'''
执行查询语句
Args:
sql - 要执行的SQL语句
Returns:
记录集JSON
'''
cur = self.conn.cursor();
rows = None;
try:
# print(" #sql: ",sql);
cur.execute(sql);
rows = cur.fetchall();
column = [t[0] for t in cur.description];
# print("Column: ",column);
result = [];
for row in rows:
myrow = {};
for i in range(0,len(column)):
myrow[column[i]]= row[i];
result.append(myrow);
if(len(result)>0):
return result;
else:
return None;
except psycopg2.Error as e:
if(len(e.args)==1):
print(" # Error : %s. \t[%s]" % (e.args[0],sql));
self.log.write(logging.ERROR,"[1000]\t"+e.args[0]+"\tSQL: "+sql);
raise Exception(str(e.args[0]));
else:
print(" # Error %d: %s. \t[%s]" % (e.args[0], e.args[1],sql));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]+"\tSQL: "+sql);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
finally:
cur.close();
def ExecNoQuery(self, sql):
'''
执行查询语句,如Create,Insert,Delete,update,drop等。
Args:
sql - 要执行的SQL语句
Returns:
None
'''
cur = self.conn.cursor();
try:
cur.execute(sql)
self.conn.commit()
return True;
except psycopg2.Error as e:
if(len(e.args)==1):
print(" # Error : %s. \t[%s]" % (e.args[0],sql));
self.log.write(logging.ERROR,"[1000]\t"+e.args[0]+"\tSQL: "+sql);
raise Exception(str(e.args[0]));
else:
print(" # Error %d: %s. \t[%s]" % (e.args[0], e.args[1],sql));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]+"\tSQL: "+sql);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
finally:
cur.close();
# self.conn.close();
def count(self,table,where=None):
'''
查询满足条件的记录数
参数:
table - 表名
where - 查询条件(不含where关键词)
返回:
int 记录数
'''
cur = self.conn.cursor();
rows = None;
myresult = None;
try:
sql = f'select count(0) as cnt from {table}';
if where!=None and where!='':
sql = sql +' where '+where;
cur.execute(sql);
# Get all data of list tuples.
row = cur.fetchone();
return row[0];
# print(" # myresult: ",myresult);
except psycopg2.Error as e:
if self.debug==True:
print(" # Error %d: %s" % (e.args[0], e.args[1]));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);
finally:
cur.close();
def exists(self,table,where):
'''
检查是否存在符合条件的记录
参数:
table - 表名
where - 查询条件(不含where关键词)
返回:
bool
True - 存在
False - 不存在
'''
try:
n = self.count(table,where);
return n>0;
except psycopg2.Error as e:
if self.debug==True:
print(" # Error %d: %s" % (e.args[0], e.args[1]));
self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);
raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);