欢迎光临散文网 会员登陆 & 注册

推荐收藏!这几段祖传的 Python 代码拿来就用

2023-03-24 15:34 作者:下班被游戏打-  | 我要投稿

今天分享几段工作生活中常用的代码,都是最为基础的功能和操作,而且大多还都是出现频率比较高的,很多都是可以拿来直接使用或者简单修改就可以放到自己的项目当中。喜欢的记得收藏、关注、点赞。

废话不多说,我们开始吧

日期生成

很多时候我们需要批量生成日期,方法有很多,这里分享两段代码

获取过去 N 天的日期

import datetimedef get_nday_list(n):    before_n_days = []     for i in range(1, n + 1)[::-1]:         before_n_days.append(str(datetime.date.today() - datetime.timedelta(days=i)))     return before_n_days a = get_nday_list(30) print(a)

Output:

['2021-12-23', '2021-12-24', '2021-12-25', '2021-12-26', '2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30', '2021-12-31', '2022-01-01', '2022-01-02', '2022-01-03', '2022-01-04', '2022-01-05', '2022-01-06', '2022-01-07', '2022-01-08', '2022-01-09', '2022-01-10', '2022-01-11', '2022-01-12', '2022-01-13', '2022-01-14', '2022-01-15', '2022-01-16', '2022-01-17', '2022-01-18', '2022-01-19', '2022-01-20', '2022-01-21']

生成一段时间内的日期

import datetimedef create_assist_date(datestart = None,dateend = None):    # 创建日期辅助表    if datestart is None:         datestart = '2016-01-01'    if dateend is None:         dateend = datetime.datetime.now().strftime('%Y-%m-%d')     # 转为日期格式    datestart=datetime.datetime.strptime(datestart,'%Y-%m-%d')     dateend=datetime.datetime.strptime(dateend,'%Y-%m-%d')     date_list = []     date_list.append(datestart.strftime('%Y-%m-%d'))     while datestart<dateend:         # 日期叠加一天        datestart+=datetime.timedelta(days=+1)         # 日期转字符串存入列表        date_list.append(datestart.strftime('%Y-%m-%d'))     return date_list d_list = create_assist_date(datestart='2021-12-27', dateend='2021-12-30') d_list

Output:

['2021-12-27', '2021-12-28', '2021-12-29', '2021-12-30']

保存数据到CSV

保存数据到 CSV 是太常见的操作了,分享一段我个人比较喜欢的写法

def save_data(data, date):     if not os.path.exists(r'2021_data_%s.csv' % date):         with open("2021_data_%s.csv" % date"a+"encoding='utf-8'as f:             f.write("标题,热度,时间,url\n")             for i in data:                 title = i["title"]                 extra = i["extra"]                 time = i['time']                 url = i["url"]                 row = '{},{},{},{}'.format(title,extra,time,url)                 f.write(row)                 f.write('\n')     else:         with open("2021_data_%s.csv" % date"a+"encoding='utf-8'as f:             for i in data:                 title = i["title"]                 extra = i["extra"]                 time = i['time']                 url = i["url"]                 row = '{},{},{},{}'.format(title,extra,time,url)                 f.write(row)                 f.write('\n')

带背景颜色的 Pyecharts

Pyecharts 作为 Echarts 的优秀 Python 实现,受到众多开发者的青睐,用 Pyecharts 作图时,使用一个舒服的背景也会给我们的图表增色不少

以饼图为例,通过添加 JavaScript 代码来改变背景颜色

def pie_rosetype(data) -> Pie:    background_color_js = (     "new echarts.graphic.LinearGradient(0, 0, 0, 1, "    "[{offset: 0, color: '#c86589'}, {offset: 1, color: '#06a7ff'}], false)")     c = (         Pie(init_opts=opts.InitOpts(bg_color=JsCode(background_color_js)))         .add(             "",             data,             radius=["30%""75%"],             center=["45%""50%"],             rosetype="radius",             label_opts=opts.LabelOpts(formatter="{b}: {c}"),         )         .set_global_opts(title_opts=opts.TitleOpts(title=""),                         )     )     return c

requests 库调用

据统计,requests 库是 Python 家族里被引用得最多的第三方库,足见其江湖地位之高大!

发送 GET 请求

import requests headers = {     'user-agent''Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',   'cookie''some_cookie'} response = requests.request("GET", url, headers=headers)

发送 POST 请求

import requests payload={} files=[] headers = {     'user-agent''Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',   'cookie''some_cookie'} response = requests.request("POST", url, headers=headers, data=payload, files=files)

根据某些条件循环请求,比如根据生成的日期

def get_data(mydate):    date_list = create_assist_date(mydate)     url = "https://test.test"    files=[]     headers = {         'user-agent''Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',         'cookie'''        }     for d in date_list:         payload={'p''10',         'day': d,         'nodeid''1',         't''itemsbydate',         'c''node'}         for i in range(1100):             payload['p'] = str(i)             print("get data of %s in page %s" % (d, str(i)))             response = requests.request("POST", url, headers=headers, data=payload, files=files)             items = response.json()['data']['items']             if items:                 save_data(items, d)             else:                 break

Python 操作各种数据库

操作 Redis

连接 Redis

import redisdef redis_conn_pool():    pool = redis.ConnectionPool(host='localhost', port=6379, decode_responses=True)     rd = redis.Redis(connection_pool=pool)     return rd

写入 Redis

from redis_conn import redis_conn_pool rd = redis_conn_pool() rd.set('test_data''mytest')

操作 MongoDB

连接 MongoDB

from pymongo import MongoClient conn = MongoClient("mongodb://%s:%s@ipaddress:49974/mydb" % ('username''password')) db = conn.mydb mongo_collection = db.mydata

批量插入数据

res = requests.get(url, params=query).json() commentList = res['data']['commentList'] mongo_collection.insert_many(commentList)

操作 MySQL

连接 MySQL

import MySQLdb# 打开数据库连接db = MySQLdb.connect("localhost""testuser""test123""TESTDB", charset='utf8' )# 使用cursor()方法获取操作游标 cursor = db.cursor()

执行 SQL 语句

# 使用 execute 方法执行 SQL 语句cursor.execute("SELECT VERSION()")# 使用 fetchone() 方法获取一条数据data = cursor.fetchone()print "Database version : %s " % data# 关闭数据库连接db.close()

Output:

Database version : 5.0.45

本地文件整理

整理文件涉及需求的比较多,这里分享的是将本地多个 CSV 文件整合成一个文件

import pandas as pdimport os df_list = []for i in os.listdir():     if "csv" in i:         day = i.split('.')[0].split('_')[-1]         df = pd.read_csv(i)         df['day'] = day         df_list.append(df) df = pd.concat(df_list, axis=0) df.to_csv("total.txt", index=0)

多线程代码

多线程也有很多实现方式,我们选择自己最为熟悉顺手的方式即可

import threading import time exitFlag = 0class myThread (threading.Thread):    def __init__(self, threadID, name, delay):         threading.Thread.__init__(self)         self.threadID = threadID         self.name = name         self.delay = delay     def run(self):         print ("开始线程:" + self.name)         print_time(self.name, self.delay, 5)         print ("退出线程:" + self.name)def print_time(threadName, delay, counter):     while counter:        if exitFlag:            threadName.exit()         time.sleep(delay)         print ("%s: %s" % (threadName, time.ctime(time.time())))         counter -= 1# 创建新线程thread1 = myThread(1"Thread-1"1) thread2 = myThread(2"Thread-2"2)# 开启新线程thread1.start() thread2.start() thread1.join() thread2.join() print ("退出主线程")

异步编程代码

异步爬取网站

import asyncioimport aiohttpimport aiofilesasync def get_html(session, url):    try:         async with session.get(url=url, timeout=8as resp:             if not resp.status // 100 == 2:                 print(resp.status)                 print("爬取", url, "出现错误")             else:                 resp.encoding = 'utf-8'                text = await resp.text()                 return text     except Exception as e:         print("出现错误", e)         await get_html(session, url)

使用异步请求之后,对应的文件保存也需要使用异步,即是一处异步,处处异步

async def download(title_list, content_list):    async with aiofiles.open('{}.txt'.format(title_list[0]), 'a',                              encoding='utf-8'as f:         await f.write('{}'.format(str(content_list)))


以上就是我平时用得最多的代码片段,希望对你有所帮助

好了,这就是今天分享的全部内容,喜欢就点个赞吧


推荐收藏!这几段祖传的 Python 代码拿来就用的评论 (共 条)

分享到微博请遵守国家法律