Python爬虫几个步骤教你写入mysql数据库
Python爬虫几个步骤教你写入mysql数据库
Python爬虫实现爬取网站中的数据并存入MySQL数据库中,在爬取的时候总要涉及到数据持久化存储,当然有很多中存储的方式,简单点的有excel、txt、json、csv等等。存入mysql我觉的有好多操作空间,如果是开发python后端也可以熟悉一下sql语句,存入数据库的方法也是试了些许网上一些方法,现在把完整功能供大家参考。
直接搜索 phpStudy安装即可,按照下图配置数据库。用户名密码自行设置,然后返回首页启动即可。
pip install pymysql
打开刚安装的phpstudy安装一个mysql客户端连接,数据库是本地的host可以填 127.0.0.1 或 localhost用户名密码是上面设置的
MySQL创建对应的表
CREATE TABLE `text_archives` ( `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'ID', `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '链接', `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '标题', `image` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '图片', `keywords` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '关键描述', `description` varchar(600) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '内容描述', `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL COMMENT '内容', `weigh` int(10) NOT NULL DEFAULT 0 COMMENT '权重', `createtime` bigint(16) NOT NULL DEFAULT 0 COMMENT '创建时间', `updatetime` bigint(16) NOT NULL DEFAULT 0 COMMENT '更新时间', `deletetime` bigint(16) NULL DEFAULT NULL COMMENT '删除时间', PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 2692 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = '内容表' ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
构造 SQL 语句的字符串 sql ,然后通过 cursor.excute(sql) 执行,下面简单的封装,直接复制即可用。
import pymysql class Mysql(object): def __init__(self): self._connect = pymysql.connect( host='127.0.0.1', user='test', password='######', database='test', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) self._cursor = self._connect.cursor() def inset_db(self, table_name, insert_data): try: data = self.get_mysql_data(data=insert_data) fields = data[0] values = data[1] sql = "INSERT INTO {table_name}({fields}) values ({values})".format(table_name=table_name, fields=fields, values=values) self._cursor.execute(sql) self._connect.commit() except Exception as e: self._connect.rollback() # 如果这里是执行的执行存储过程的sql命令,那么可能会存在rollback的情况,所以这里应该考虑到 print("数据插入失败,失败原因:", e) print(insert_data) else: # self.db_close() return self._cursor.lastrowid def update_db(self, table_name, update_data, wheres=None): try: if wheres is not None: sql = "UPDATE {table_name} SET {update_data} WHERE {wheres}".format( table_name=table_name, update_data=update_data, wheres=wheres ) else: sql = "UPDATE {table_name} SET {update_data}".format( table_name=table_name, update_data=update_data) self._cursor.execute(sql) self._connect.commit() except Exception as e: print("更新失败:", e) return False else: # self.db_close() return True def delete_db(self, table_name, wheres): try: # 构建sql语句 sql = "DELETE FROM {table_name} WHERE {wheres}".format(table_name=table_name, wheres=wheres) self._cursor.execute(sql) self._connect.commit() except Exception as e: print('删除失败:', e) return False else: # self.db_close() return True def select_db(self, table_name, fields, wheres=None, get_one=False): try: if wheres is not None: sql = "SELECT {fields} FROM {table_name} WHERE {wheres}".format( fields=fields, table_name=table_name, wheres=wheres ) else: sql = "SELECT {fields} FROM {table_name}".format(fields=fields, table_name=table_name) self._cursor.execute(sql) self._connect.commit() if get_one: result = self._cursor.fetchone() else: result = self._cursor.fetchall() except Exception as e: print("查询失败", e) return None else: # self.db_close() return result def get_mysql_data(self, data): fields = "" insert_data = "" for k, v in data.items(): fields = fields + k + ',' insert_data = insert_data + "'" + str(v) + "'" + ',' fields = fields.strip(',') insert_data = insert_data.strip(',') return [fields, insert_data] def db_close(self): self._cursor.close() self._connect.close()
这次简单点咱们用xpath就行,有一个小技巧咱们在爬取的网页打开开发都模式F12.如下图红框复制第一个或都第二个就行。
下面代码是实现爬取数据然后存入数据库类,大家可参考
from model.nav import Nav import requests from urllib import parse from lxml import etree from fake_useragent import UserAgent from lib.reptile import Reptile import json class Common(object): def __init__(self, params): self.url = params['url'] self.params = params self.blog = 1 def get_header(self): ua = UserAgent() headers = { 'User-Agent': ua.random } return headers def get_html(self, url): # 在超时间内,对于失败页面尝试请求三次 if self.blog <= 3: try: res = requests.get(url=url, headers=self.get_header(), timeout=3) res.encoding = res.apparent_encoding html = res.text return html except Exception as e: print(e) self.blog += 1 self.get_html(url) def json_insert_data(self, params): category_id = self.insert_category(cname=params['category_name'], pid=params['pid'], icon='') print("分类插入成功:{}".format(params['category_name'])) if category_id: url = params['url'] title = params['title'] image = params['image'] description = params['description'] keywords = params['keywords'] content = params['content'] self.insert_archives(category_id, url, title, image, description, keywords, content) print("内容插入成功:{}".format(title)) print("------------------------------------------------------------") def get_item(self, xpath_html): item_list = xpath_html.xpath(self.params['item_xpath']) print(item_list) for row in item_list: url_list = row.xpath(self.params['url_xpath']) if len(url_list) > 0: self.get_content(url_list[0]) def get_content(self, url): print("正在抓取链接:{}".format(url)) domain = parse.urlparse(url).netloc d_domain = parse.urlparse(self.url).netloc if domain == d_domain: html = self.get_html(url) self.reptile.blog = 1 if html: p = etree.HTML(html) title = self.get_conmon_content(p, self.params['title_xpath']) print("标题为:{}".format(title)) category_name = self.get_conmon_content(p, self.params['category_xpath']) print("分类为:{}".format(category_name)) image = self.get_conmon_content(p, self.params['image_xpath']) print("图片为:{}".format(image)) link = self.get_conmon_content(p, self.params['link_xpaht']) print("链接为:{}".format(link)) description = self.get_conmon_content(p, self.params['description_xpath']) print("描述为:{}".format(description)) keywords = self.get_conmon_content(p, self.params['keywords_xpath']) print("关键描述:{}".format(keywords)) content = self.get_conmon_content(p, self.params['content_xpath']) print("内容为:{}".format(content)) params = { "pid": 158, "title": title, "category_name": category_name, "image": image, 'url': link, 'description': description, 'keywords': keywords, 'content': content, } if title and category_name and link: self.json_insert_data(params)#存入数据库 def get_conmon_content(self, xpath_html, xpath): content_list = xpath_html.xpath(xpath) content = '' if len(content_list) > 0: content = content_list[0].strip() return content def run(self): print("url:{}".format(self.url)) html = self.get_html(self.url) if html: p = etree.HTML(html) self.get_item(p) #爬取的xpath params = { "url": "https://www.widiz.com/", #爬取url "url_xpath": './/a[1]/@href', "title_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/h1/text()', "category_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/a[1]/text()', "image_xpath": '/html/body/div[1]/div[2]/div[3]/div/div[2]/div/img/@src', "link_xpaht": '/html/body/div[1]/div[2]/div[3]/div/div[3]/div/div/div[1]/span/a/@href', "description_xpath": '/html/head/meta[10]/@content', "keywords_xpath": '/html/head/meta[5]/@content', "content_xpath": '/html/body/div[1]/div[2]/div[3]/main/div[1]/div/div[1]/div/div[2]/text()' } Common(params).run()
最终效果: