Python爬虫几个步骤教你写入mysql数据库
Python爬虫几个步骤教你写入mysql数据库
Python爬虫实现爬取网站中的数据并存入MySQL数据库中,在爬取的时候总要涉及到数据持久化存储,当然有很多中存储的方式,简单点的有excel、txt、json、csv等等。存入mysql我觉的有好多操作空间,如果是开发python后端也可以熟悉一下sql语句,存入数据库的方法也是试了些许网上一些方法,现在把完整功能供大家参考。
直接搜索 phpStudy安装即可,按照下图配置数据库。用户名密码自行设置,然后返回首页启动即可。
pip install pymysql
打开刚安装的phpstudy安装一个mysql客户端连接,数据库是本地的host可以填 127.0.0.1 或 localhost用户名密码是上面设置的
MySQL创建对应的表
CREATE TABLE `text_archives` ( `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID', `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '链接', `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '标题', `image` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '图片', `keywords` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '关键描述', `description` varchar(600) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '内容描述', `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '内容', `weigh` int(10) NOT NULL DEFAULT 0 COMMENT '权重', `createtime` bigint(16) NOT NULL DEFAULT 0 COMMENT '创建时间', `updatetime` bigint(16) NOT NULL DEFAULT 0 COMMENT '更新时间', `deletetime` bigint(16) NULL DEFAULT NULL COMMENT '删除时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2692 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = '内容表' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
构造 SQL 语句的字符串 sql ,然后通过 cursor.excute(sql) 执行,下面简单的封装,直接复制即可用。
import pymysql
class Mysql(object):
def __init__(self):
self._connect = pymysql.connect(
host='127.0.0.1',
user='test',
password='######',
database='test',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
self._cursor = self._connect.cursor()
def inset_db(self, table_name, insert_data):
try:
data = self.get_mysql_data(data=insert_data)
fields = data[0]
values = data[1]
sql = "INSERT INTO {table_name}({fields}) values ({values})".format(table_name=table_name, fields=fields,
values=values)
self._cursor.execute(sql)
self._connect.commit()
except Exception as e:
self._connect.rollback() # 如果这里是执行的执行存储过程的sql命令,那么可能会存在rollback的情况,所以这里应该考虑到
print("数据插入失败,失败原因:", e)
print(insert_data)
else:
# self.db_close()
return self._cursor.lastrowid
def update_db(self, table_name, update_data, wheres=None):
try:
if wheres is not None:
sql = "UPDATE {table_name} SET {update_data} WHERE {wheres}".format(
table_name=table_name,
update_data=update_data,
wheres=wheres
)
else:
sql = "UPDATE {table_name} SET {update_data}".format(
table_name=table_name,
update_data=update_data)
self._cursor.execute(sql)
self._connect.commit()
except Exception as e:
print("更新失败:", e)
return False
else:
# self.db_close()
return True
def delete_db(self, table_name, wheres):
try:
# 构建sql语句
sql = "DELETE FROM {table_name} WHERE {wheres}".format(table_name=table_name, wheres=wheres)
self._cursor.execute(sql)
self._connect.commit()
except Exception as e:
print('删除失败:', e)
return False
else:
# self.db_close()
return True
def select_db(self, table_name, fields, wheres=None, get_one=False):
try:
if wheres is not None:
sql = "SELECT {fields} FROM {table_name} WHERE {wheres}".format(
fields=fields,
table_name=table_name,
wheres=wheres
)
else:
sql = "SELECT {fields} FROM {table_name}".format(fields=fields, table_name=table_name)
self._cursor.execute(sql)
self._connect.commit()
if get_one:
result = self._cursor.fetchone()
else:
result = self._cursor.fetchall()
except Exception as e:
print("查询失败", e)
return None
else:
# self.db_close()
return result
def get_mysql_data(self, data):
fields = ""
insert_data = ""
for k, v in data.items():
fields = fields + k + ','
insert_data = insert_data + "'" + str(v) + "'" + ','
fields = fields.strip(',')
insert_data = insert_data.strip(',')
return [fields, insert_data]
def db_close(self):
self._cursor.close()
self._connect.close()
这次简单点咱们用xpath就行,有一个小技巧咱们在爬取的网页打开开发都模式F12.如下图红框复制第一个或都第二个就行。
下面代码是实现爬取数据然后存入数据库类,大家可参考
from model.nav import Nav
import requests
from urllib import parse
from lxml import etree
from fake_useragent import UserAgent
from lib.reptile import Reptile
import json
class Common(object):
def __init__(self, params):
self.url = params['url']
self.params = params
self.blog = 1
def get_header(self):
ua = UserAgent()
headers = {
'User-Agent': ua.random
}
return headers
def get_html(self, url):
# 在超时间内,对于失败页面尝试请求三次
if self.blog <= 3:
try:
res = requests.get(url=url, headers=self.get_header(), timeout=3)
res.encoding = res.apparent_encoding
html = res.text
return html
except Exception as e:
print(e)
self.blog += 1
self.get_html(url)
def json_insert_data(self, params):
category_id = self.insert_category(cname=params['category_name'], pid=params['pid'], icon='')
print("分类插入成功:{}".format(params['category_name']))
if category_id:
url = params['url']
title = params['title']
image = params['image']
description = params['description']
keywords = params['keywords']
content = params['content']
self.insert_archives(category_id, url, title, image, description, keywords, content)
print("内容插入成功:{}".format(title))
print("------------------------------------------------------------")
def get_item(self, xpath_html):
item_list = xpath_html.xpath(self.params['item_xpath'])
print(item_list)
for row in item_list:
url_list = row.xpath(self.params['url_xpath'])
if len(url_list) > 0:
self.get_content(url_list[0])
def get_content(self, url):
print("正在抓取链接:{}".format(url))
domain = parse.urlparse(url).netloc
d_domain = parse.urlparse(self.url).netloc
if domain == d_domain:
html = self.get_html(url)
self.reptile.blog = 1
if html:
p = etree.HTML(html)
title = self.get_conmon_content(p, self.params['title_xpath'])
print("标题为:{}".format(title))
category_name = self.get_conmon_content(p, self.params['category_xpath'])
print("分类为:{}".format(category_name))
image = self.get_conmon_content(p, self.params['image_xpath'])
print("图片为:{}".format(image))
link = self.get_conmon_content(p, self.params['link_xpaht'])
print("链接为:{}".format(link))
description = self.get_conmon_content(p, self.params['description_xpath'])
print("描述为:{}".format(description))
keywords = self.get_conmon_content(p, self.params['keywords_xpath'])
print("关键描述:{}".format(keywords))
content = self.get_conmon_content(p, self.params['content_xpath'])
print("内容为:{}".format(content))
params = {
"pid": 158,
"title": title,
"category_name": category_name,
"image": image,
'url': link,
'description': description,
'keywords': keywords,
'content': content,
}
if title and category_name and link:
self.json_insert_data(params)#存入数据库
def get_conmon_content(self, xpath_html, xpath):
content_list = xpath_html.xpath(xpath)
content = ''
if len(content_list) > 0:
content = content_list[0].strip()
return content
def run(self):
print("url:{}".format(self.url))
html = self.get_html(self.url)
if html:
p = etree.HTML(html)
self.get_item(p)
#爬取的xpath
params = {
"url": "https://www.widiz.com/", #爬取url
"url_xpath": './/a[1]/@href',
"title_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/h1/text()',
"category_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/a[1]/text()',
"image_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[2]/div/img/@src',
"link_xpaht": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/div/div[1]/span/a/@href',
"description_xpath": '/html/head/meta[10]/@content',
"keywords_xpath": '/html/head/meta[5]/@content',
"content_xpath": '/html/body/div[1]/div[2]/div[3]/main/div[1]/div/div[1]/div/div[2]/text()'
}
Common(params).run()最终效果:

