欢迎光临散文网 会员登陆 & 注册

Postgresql的Python数据访问管理类

2023-04-09 18:13 作者:夕林泉石  | 我要投稿


####################################################################################

## File     : dao_postgre.py

## Desc     : The dao class to access the postgresql database.

## Author   : Li G.Q.

## Date     : 2021/9/16/

## Notes    : Please keep the copyright information if you copy/modify this file.

#####################################################################################


from logging        import exception

from flask          import Flask,abort,request,session

from urllib.parse   import unquote # For Python 2 use: from urlparse import unquote


from config         import db_config

from config         import app_config

from core.util      import Util

from core.logger    import Logger


import logging

import psycopg2

import json 

import sys

import traceback


sys.path.append("..")


'''操作Postgresql数据库的类'''

class Postgresql:

    '''数据库连接'''

    conn    = None;

    log     = None;

    debug   = app_config.debug;

    util    = None;

    

    def __init__(self):

        '''

        构建函数,初始化数据库连接        

        '''

        self.log = Logger(__name__);

        self.util = Util();

        

        try:                      

            # 打开数据库连接

            self.conn = psycopg2.connect(

                                     host       = db_config.postgre_config.host

                                    ,user       = db_config.postgre_config.user

                                    ,password   = db_config.postgre_config.password

                                    ,database   = db_config.postgre_config.database

                                    ,port       = db_config.postgre_config.port

                                );          

        except psycopg2.Error as e:

            if self.debug==True:

                print(" # Error in postgre init function %d: %s" % (e.args[0], e.args[1]));

            self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);

            raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

        

    def close(self):

        '''

        关闭数据库连接

        '''

        try:

            self.conn.close();

        except psycopg2.Error as e:

            if self.debug==True:

                print(" # Error %d: %s" % (e.args[0], e.args[1]));

            self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);

            raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

        

    def get(self,param):

        '''

        获取数据表记录,返回结果集

        

        参数:

            param:dict型参数,如{'fields':'字段列表','where':'查询条件','limit':分页大小,"page":页号}

        返回: 

            结果集JSON

        '''       

                

        try: 

            if("fields" in param.keys()):

                if(param["fields"] ==""):

                    param["fields"] = "*";

            else:

                param["fields"] = "*";

                

            sql = f'select {param["fields"]} from {param["table"]}';

            if("where" in param.keys() and param["where"] and param["where"] !=""):

                sql+=" where "+unquote(param["where"]);  #decodeURIComponent in js

            

            if("order" in param.keys() and param["order"] and param["order"] !=""):

                sql+=" order by " +unquote(param["order"]);

                

            if("limit" in param.keys() and param["limit"] and int(param["limit"])>0):

                sql+=f' limit {param["limit"]}';

                

            if("page" in param.keys() and param["page"] and int(param["page"])>=0):

                sql+=f' OFFSET '+str(int(param["page"])*int(param["limit"]));

                

            return self.ExecQuery(sql);

       

        except psycopg2.Error as e:

            if self.debug==True:

                print(" # Error %d: %s" % (e.args[0], e.args[1]));

            self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);

            raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

    

    def insert(self,param):

        '''

        添加数据表记录

        

        参数:

            param:dict型参数,如{'table':'数据表','字段':字段值}

        返回: 

            None

        '''       

                

        try: 

            table  = None;

            fields = "";

            values = "";

            for key in param.keys():

                if key == 'table':

                    table = param[key];

                else:

                    if fields!="":

                        fields+=",";                        

                        values+=",";                    

                    

                    fields+=key;

                    if self.util.var_type(param[key]) == "str":

                        values+="'"+param[key]+"'";

                    elif self.util.var_type(param[key]) == "int" or self.util.var_type(param[key]) == "float":

                        values+=str(param[key]);

                    elif self.util.var_type(param[key]) == "datetime.datetime":

                        values+="'"+param[key].strftime("%Y-%m-%d %H:%M:%S")+"'";

                    elif self.util.var_type(param[key]) == "bool":

                        if param[key] == True:

                            values+="1";

                        else:

                            values+="0";

                    else:

                        values+="'"+param[key]+"'";

                        

            sql = f'INSERT INTO {table}({fields})values({values})';    

            print(" # sql: ",sql);        

            

            return self.ExecNoQuery(sql);

        # except exception as ex:

        #     self.log.write(logging.ERROR,ex);

        #     raise Exception(ex);

        except psycopg2.Error as e:

            if self.debug==True:

                print(" # Error %d: %s" % (e.args[0], e.args[1]));

            self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);

            raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

        

    def delete(self,param):

        '''

        删除数据表记录

        

        参数:

            param:dict型参数,如{'table':'数据表','where':'查询条件'}

        返回: 

            None

        '''       

                

        try: 

            sql = f'DELETE from {param["table"]}';

            if ("where" in param.keys() and param["where"] and param["where"] !=""):

                sql+=" where "+unquote(param["where"]);  #decodeURIComponent in js

            elif("id" in param.keys()):

                sql+= " where id="+param["id"];

            

            return self.ExecNoQuery(sql);

        # except exception as ex:

        #     self.log.write(logging.ERROR,ex);

        #     raise Exception(ex);

        except psycopg2.Error as e:

            if self.debug==True:

                print(" # Error %d: %s" % (e.args[0], e.args[1]));

            self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);

            raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

 

    def update(self,param):

        '''

        更新数据

        

        参数:

            param:dict型参数,如{'table':'数据表','字段':字段值},其中必须包括id的值,否则无法更新

        返回: 

            int: 0 - 成功,2003 - 更新有错误

        '''

        try: 

            where = "";

            if param['table']==None:

                return 2003;

            sql = f"UPDATE {param['table']} set ";

            i=0;

            for key in param.keys():

                if key == 'table':

                    table = param[key];

                elif(key == 'id'):

                    where = f'id = {param[key]}';

                else:                    

                    sql +=(',' if i>0 else '') +f'{key} = ';

                    if self.util.var_type(param[key]) == "str":

                        sql +=f"'{param[key]}'";

                    elif self.util.var_type(param[key]) == "int" or self.util.var_type(param[key]) == "float":

                        sql += str(param[key]);

                    elif self.util.var_type(param[key]) == "datetime.datetime":

                        sql +=f"'{param[key].strftime('%Y-%m-%d %H:%M:%S')}'";

                    elif self.util.var_type(param[key]) == "bool":

                        if param[key] == True:

                            sql += "true";

                        else:

                            sql += "false";

                    else:

                        sql +="'"+param[key]+"'";

                    i += 1;                        

            

            sql += f' where {where}';               

            print(" # sql: ",sql);        

            

            if(self.ExecNoQuery(sql)):

                return 0;

        except psycopg2.Error as e:

            if self.debug==True:

                print(" # Error %d: %s" % (e.args[0], e.args[1]));

            self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);

            raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

      

    def ExecQuery(self, sql):

        '''

        执行查询语句

        Args:

            sql - 要执行的SQL语句

        Returns:

            记录集JSON

        '''

        cur = self.conn.cursor();

        rows = None;

        try:

            # print(" #sql: ",sql);

            cur.execute(sql);

            rows = cur.fetchall();

            column = [t[0] for t in cur.description];

            # print("Column: ",column);

            result = [];

            

            for row in rows:

                myrow = {};

                for i in range(0,len(column)):

                    myrow[column[i]]= row[i];

                result.append(myrow);

            if(len(result)>0):           

                return result;

            else:

                return None;

        except psycopg2.Error as e:            

            if(len(e.args)==1):

                print(" # Error : %s. \t[%s]" % (e.args[0],sql));

                self.log.write(logging.ERROR,"[1000]\t"+e.args[0]+"\tSQL: "+sql);

                raise Exception(str(e.args[0]));

            else:

                print(" # Error %d: %s. \t[%s]" % (e.args[0], e.args[1],sql));

                self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]+"\tSQL: "+sql);

                raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

        finally:

            cur.close();            


    def ExecNoQuery(self, sql):

        '''

        执行查询语句,如Create,Insert,Delete,update,drop等。

        Args:

            sql - 要执行的SQL语句

        Returns:

            None

        '''

        cur = self.conn.cursor();

        try:

            cur.execute(sql)

            self.conn.commit()

            return True;

        except psycopg2.Error as e:

            if(len(e.args)==1):

                print(" # Error : %s. \t[%s]" % (e.args[0],sql));

                self.log.write(logging.ERROR,"[1000]\t"+e.args[0]+"\tSQL: "+sql);

                raise Exception(str(e.args[0]));

            else:

                print(" # Error %d: %s. \t[%s]" % (e.args[0], e.args[1],sql));

                self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]+"\tSQL: "+sql);

                raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

        finally:

            cur.close();

            # self.conn.close();


    def count(self,table,where=None):

        '''

        查询满足条件的记录数

        

        参数:

            table - 表名

            where - 查询条件(不含where关键词)

        返回:

            int 记录数            

        '''

        cur = self.conn.cursor();

        rows = None;

        myresult = None;

        try:

            sql = f'select count(0) as cnt from {table}';

            if where!=None and where!='':

                sql = sql +' where '+where;

            cur.execute(sql);

            # Get all data of list tuples.

            row = cur.fetchone();

            return row[0];

            # print(" # myresult: ",myresult);

        except psycopg2.Error as e:

            if self.debug==True:

                print(" # Error %d: %s" % (e.args[0], e.args[1]));

            self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);

            raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);

        finally:

            cur.close();


    def exists(self,table,where):

        '''

        检查是否存在符合条件的记录

        

        参数:

            table - 表名

            where - 查询条件(不含where关键词)

        返回:

            bool

            True - 存在

            False - 不存在

        '''

        try:            

            n = self.count(table,where);

            return n>0;

        except psycopg2.Error as e:

            if self.debug==True:

                print(" # Error %d: %s" % (e.args[0], e.args[1]));

            self.log.write(logging.ERROR,"["+str(e.args[0])+"]\t"+e.args[1]);

            raise Exception("["+str(e.args[0])+"]\t"+e.args[1]);


Postgresql的Python数据访问管理类的评论 (共 条)

分享到微博请遵守国家法律