初级爬虫实战——麻省理工学院新闻
文章目录
- 发现宝藏
- 一、 目标
- 二、 浅析
- 三、获取所有模块
- 四、请求处理模块、版面、文章
- 1. 分析切换页面的参数传递
- 2. 获取共有多少页标签并遍历版面
- 3.解析版面并保存版面信息
- 4. 解析文章列表和文章
- 5. 清洗文章
- 6. 保存文章图片
- 五、完整代码
- 六、效果展示
发现宝藏
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。【宝藏入口】。
一、 目标
爬取news.mit.edu的字段,包含标题、内容,作者,发布时间,链接地址,文章快照 (可能需要翻墙才能访问)
二、 浅析
1.全部新闻大致分为4个模块
2.每个模块的标签列表大致如下
3.每个标签对应的文章列表大致如下
4.具体每篇文章对应的结构如下
三、获取所有模块
其实就四个模块,列举出来就好,然后对每个分别解析爬取每个模块
class MitnewsScraper: def __init__(self, root_url, model_url, img_output_dir): self.root_url = root_url self.model_url = model_url self.img_output_dir = img_output_dir self.headers = { 'Referer': 'https://news.mit.edu/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/122.0.0.0 Safari/537.36', 'Cookie': '替换成你自己的', } ... def run(): root_url = 'https://news.mit.edu/' model_urls = ['https://news.mit.edu/topic', 'https://news.mit.edu/clp', 'https://news.mit.edu/department', 'https://news.mit.edu/'] output_dir = 'D:\imgs\mit-news' for model_url in model_urls: scraper = MitnewsScraper(root_url, model_url, output_dir) scraper.catalogue_all_pages()
四、请求处理模块、版面、文章
先处理一个模块(TOPICS)
1. 分析切换页面的参数传递
如图可知是get请求,需要传一个参数page
2. 获取共有多少页标签并遍历版面
实际上是获取所有的page参数,然后进行遍历获取所有的标签
# 获取一个模块有多少版面 def catalogue_all_pages(self): response = requests.get(self.model_url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') try: match = re.search(r'of (\d+) topics', soup.text) total_catalogues = int(match.group(1)) total_pages = math.ceil(total_catalogues / 20) print('topics模块一共有' + match.group(1) + '个版面,' + str(total_pages) + '页数据') for page in range(0, total_pages): self.parse_catalogues(page) print(f"========Finished catalogues page {page + 1}========") except: self.parse_catalogues(0)
3.解析版面并保存版面信息
前三个模块的版面列表
第四个模块的版面列表
# 解析版面列表里的版面 def parse_catalogues(self, page): params = {'page': page} response = requests.get(self.model_url, params=params, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') if self.root_url == self.model_url: catalogue_list = soup.find('div', 'site-browse--recommended-section site-browse--recommended-section--schools') catalogues_list = catalogue_list.find_all('li') else: catalogue_list = soup.find('ul', 'page-vocabulary--views--list') catalogues_list = catalogue_list.find_all('li') for index, catalogue in enumerate(catalogues_list): # 操作时间 date = datetime.now() # 版面标题 catalogue_title = catalogue.find('a').get_text(strip=True) print('第' + str(index + 1) + '个版面标题为:' + catalogue_title) catalogue_href = catalogue.find('a').get('href') # 版面id catalogue_id = catalogue_href[1:] catalogue_url = self.root_url + catalogue_href print('第' + str(index + 1) + '个版面地址为:' + catalogue_url) # 根据版面url解析文章列表 response = requests.get(catalogue_url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') match = re.search(r'of (\d+)', soup.text) # 查找一个版面有多少篇文章 total_cards = int(match.group(1)) total_pages = math.ceil(total_cards / 15) print(f'{catalogue_title}版面一共有{total_cards}篇文章,' + f'{total_pages}页数据') for page in range(0, total_pages): self.parse_cards_list(page, catalogue_url, catalogue_id) print(f"========Finished {catalogue_title} 版面 page {page + 1}========") # 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['mit-news'] # 创建或选择集合 catalogues_collection = db['catalogues'] # 插入示例数据到 catalogues 集合 catalogue_data = { 'id': catalogue_id, 'date': date, 'title': catalogue_title, 'url': catalogue_url, 'cardSize': total_cards } catalogues_collection.insert_one(catalogue_data) return True else: raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")
4. 解析文章列表和文章
寻找冗余部分并删除,例如
# 解析文章列表里的文章 def parse_cards_list(self, page, url, catalogue_id): params = {'page': page} response = requests.get(url, params=params, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') card_list = soup.find('div', 'page-term--views--list') cards_list = card_list.find_all('div', 'page-term--views--list-item') for index, card in enumerate(cards_list): # 对应的版面id catalogue_id = catalogue_id # 操作时间 date = datetime.now() # 文章标题 card_title = card.find('a', 'term-page--news-article--item--title--link').find('span').get_text( strip=True) # 文章简介 card_introduction = card.find('p', 'term-page--news-article--item--dek').find('span').get_text( strip=True) # 文章更新时间 publish_time = card.find('p', 'term-page--news-article--item--publication-date').find('time').get( 'datetime') updateTime = datetime.strptime(publish_time, '%Y-%m-%dT%H:%M:%SZ') # 文章地址 temp_url = card.find('div', 'term-page--news-article--item--cover-image').find('a').get('href') url = 'https://news.mit.edu' + temp_url # 文章id pattern = r'(\w+(-\w+)*)-(\d+)' match = re.search(pattern, temp_url) card_id = str(match.group(0)) card_response = requests.get(url, headers=self.headers) soup = BeautifulSoup(card_response.text, 'html.parser') # 原始htmldom结构 html_title = soup.find('div', id='block-mit-page-title') html_content = soup.find('div', id='block-mit-content') # 合并标题和内容 html_title.append(html_content) html_cut1 = soup.find('div', 'news-article--topics') html_cut2 = soup.find('div', 'news-article--archives') html_cut3 = soup.find('div', 'news-article--content--side-column') html_cut4 = soup.find('div', 'news-article--press-inquiries') html_cut5 = soup.find_all('div', 'visually-hidden') html_cut6 = soup.find('p', 'news-article--images-gallery--nav--inner') # 移除元素 if html_cut1: html_cut1.extract() if html_cut2: html_cut2.extract() if html_cut3: html_cut3.extract() if html_cut4: html_cut4.extract() if html_cut5: for item in html_cut5: item.extract() if html_cut6: html_cut6.extract() # 获取合并后的内容文本 html_content = html_title # 文章作者 author_list = html_content.find('div', 'news-article--authored-by').find_all('span') author = '' for item in author_list: author = author + item.get_text() # 增加保留html样式的源文本 origin_html = html_content.prettify() # String # 转义网页中的图片标签 str_html = self.transcoding_tags(origin_html) # 再包装成 temp_soup = BeautifulSoup(str_html, 'html.parser') # 反转译文件中的插图 str_html = self.translate_tags(temp_soup.text) # 绑定更新内容 content = self.clean_content(str_html) # 下载图片 imgs = [] img_array = soup.find_all('div', 'news-article--image-item') for item in img_array: img_url = self.root_url + item.find('img').get('data-src') imgs.append(img_url) if len(imgs) != 0: # 下载图片 illustrations = self.download_images(imgs, card_id) # 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['mit-news'] # 创建或选择集合 cards_collection = db['cards'] # 插入示例数据到 catalogues 集合 card_data = { 'id': card_id, 'catalogueId': catalogue_id, 'type': 'mit-news', 'date': date, 'title': card_title, 'author': author, 'card_introduction': card_introduction, 'updatetime': updateTime, 'url': url, 'html_content': str(html_content), 'content': content, 'illustrations': illustrations, } cards_collection.insert_one(card_data) return True else: raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}")
5. 清洗文章
# 工具 转义标签 def transcoding_tags(self, htmlstr): re_img = re.compile(r'\s*\s*', re.M) s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr) # IMG 转义 return s # 工具 转义标签 def translate_tags(self, htmlstr): re_img = re.compile(r'@@##(img.*?)##@@', re.M) s = re_img.sub(r'', htmlstr) # IMG 转义 return s # 清洗文章 def clean_content(self, content): if content is not None: content = re.sub(r'\r', r'\n', content) content = re.sub(r'\n{2,}', '', content) content = re.sub(r' {6,}', '', content) content = re.sub(r' {3,}\n', '', content) content = re.sub(r'', '', content) content = content.replace( ' ', '') content = content.replace( ''' e}') except Exception as e: print(f'保存图片时发生错误:{e}') return downloaded_images # 如果文件夹存在则跳过 else: print(f'文章id为{card_id}的图片文件夹已经存在') return []
五、完整代码
import os from datetime import datetime import requests from bs4 import BeautifulSoup from pymongo import MongoClient import re import math class MitnewsScraper: def __init__(self, root_url, model_url, img_output_dir): self.root_url = root_url self.model_url = model_url self.img_output_dir = img_output_dir self.headers = { 'Referer': 'https://news.mit.edu/', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/122.0.0.0 Safari/537.36', 'Cookie': '替换成你自己的' } # 获取一个模块有多少版面 def catalogue_all_pages(self): response = requests.get(self.model_url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') try: match = re.search(r'of (\d+) topics', soup.text) total_catalogues = int(match.group(1)) total_pages = math.ceil(total_catalogues / 20) print('topics模块一共有' + match.group(1) + '个版面,' + str(total_pages) + '页数据') for page in range(0, total_pages): self.parse_catalogues(page) print(f"========Finished catalogues page {page + 1}========") except: self.parse_catalogues(0) # 解析版面列表里的版面 def parse_catalogues(self, page): params = {'page': page} response = requests.get(self.model_url, params=params, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') if self.root_url == self.model_url: catalogue_list = soup.find('div', 'site-browse--recommended-section site-browse--recommended-section--schools') catalogues_list = catalogue_list.find_all('li') else: catalogue_list = soup.find('ul', 'page-vocabulary--views--list') catalogues_list = catalogue_list.find_all('li') for index, catalogue in enumerate(catalogues_list): # 操作时间 date = datetime.now() # 版面标题 catalogue_title = catalogue.find('a').get_text(strip=True) print('第' + str(index + 1) + '个版面标题为:' + catalogue_title) catalogue_href = catalogue.find('a').get('href') # 版面id catalogue_id = catalogue_href[1:] catalogue_url = self.root_url + catalogue_href print('第' + str(index + 1) + '个版面地址为:' + catalogue_url) # 根据版面url解析文章列表 response = requests.get(catalogue_url, headers=self.headers) soup = BeautifulSoup(response.text, 'html.parser') match = re.search(r'of (\d+)', soup.text) # 查找一个版面有多少篇文章 total_cards = int(match.group(1)) total_pages = math.ceil(total_cards / 15) print(f'{catalogue_title}版面一共有{total_cards}篇文章,' + f'{total_pages}页数据') for page in range(0, total_pages): self.parse_cards_list(page, catalogue_url, catalogue_id) print(f"========Finished {catalogue_title} 版面 page {page + 1}========") # 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['mit-news'] # 创建或选择集合 catalogues_collection = db['catalogues'] # 插入示例数据到 catalogues 集合 catalogue_data = { 'id': catalogue_id, 'date': date, 'title': catalogue_title, 'url': catalogue_url, 'cardSize': total_cards } catalogues_collection.insert_one(catalogue_data) return True else: raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}") # 解析文章列表里的文章 def parse_cards_list(self, page, url, catalogue_id): params = {'page': page} response = requests.get(url, params=params, headers=self.headers) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') card_list = soup.find('div', 'page-term--views--list') cards_list = card_list.find_all('div', 'page-term--views--list-item') for index, card in enumerate(cards_list): # 对应的版面id catalogue_id = catalogue_id # 操作时间 date = datetime.now() # 文章标题 card_title = card.find('a', 'term-page--news-article--item--title--link').find('span').get_text( strip=True) # 文章简介 card_introduction = card.find('p', 'term-page--news-article--item--dek').find('span').get_text( strip=True) # 文章更新时间 publish_time = card.find('p', 'term-page--news-article--item--publication-date').find('time').get( 'datetime') updateTime = datetime.strptime(publish_time, '%Y-%m-%dT%H:%M:%SZ') # 文章地址 temp_url = card.find('div', 'term-page--news-article--item--cover-image').find('a').get('href') url = 'https://news.mit.edu' + temp_url # 文章id pattern = r'(\w+(-\w+)*)-(\d+)' match = re.search(pattern, temp_url) card_id = str(match.group(0)) card_response = requests.get(url, headers=self.headers) soup = BeautifulSoup(card_response.text, 'html.parser') # 原始htmldom结构 html_title = soup.find('div', id='block-mit-page-title') html_content = soup.find('div', id='block-mit-content') # 合并标题和内容 html_title.append(html_content) html_cut1 = soup.find('div', 'news-article--topics') html_cut2 = soup.find('div', 'news-article--archives') html_cut3 = soup.find('div', 'news-article--content--side-column') html_cut4 = soup.find('div', 'news-article--press-inquiries') html_cut5 = soup.find_all('div', 'visually-hidden') html_cut6 = soup.find('p', 'news-article--images-gallery--nav--inner') # 移除元素 if html_cut1: html_cut1.extract() if html_cut2: html_cut2.extract() if html_cut3: html_cut3.extract() if html_cut4: html_cut4.extract() if html_cut5: for item in html_cut5: item.extract() if html_cut6: html_cut6.extract() # 获取合并后的内容文本 html_content = html_title # 文章作者 author_list = html_content.find('div', 'news-article--authored-by').find_all('span') author = '' for item in author_list: author = author + item.get_text() # 增加保留html样式的源文本 origin_html = html_content.prettify() # String # 转义网页中的图片标签 str_html = self.transcoding_tags(origin_html) # 再包装成 temp_soup = BeautifulSoup(str_html, 'html.parser') # 反转译文件中的插图 str_html = self.translate_tags(temp_soup.text) # 绑定更新内容 content = self.clean_content(str_html) # 下载图片 imgs = [] img_array = soup.find_all('div', 'news-article--image-item') for item in img_array: img_url = self.root_url + item.find('img').get('data-src') imgs.append(img_url) if len(imgs) != 0: # 下载图片 illustrations = self.download_images(imgs, card_id) # 连接 MongoDB 数据库服务器 client = MongoClient('mongodb://localhost:27017/') # 创建或选择数据库 db = client['mit-news'] # 创建或选择集合 cards_collection = db['cards'] # 插入示例数据到 catalogues 集合 card_data = { 'id': card_id, 'catalogueId': catalogue_id, 'type': 'mit-news', 'date': date, 'title': card_title, 'author': author, 'card_introduction': card_introduction, 'updatetime': updateTime, 'url': url, 'html_content': str(html_content), 'content': content, 'illustrations': illustrations, } cards_collection.insert_one(card_data) return True else: raise Exception(f"Failed to fetch page {page}. Status code: {response.status_code}") # 下载图片 def download_images(self, img_urls, card_id): # 根据card_id创建一个新的子目录 images_dir = os.path.join(self.img_output_dir, card_id) if not os.path.exists(images_dir): os.makedirs(images_dir) downloaded_images = [] for index, img_url in enumerate(img_urls): try: response = requests.get(img_url, stream=True, headers=self.headers) if response.status_code == 200: # 从URL中提取图片文件名 img_name_with_extension = img_url.split('/')[-1] pattern = r'^[^?]*' match = re.search(pattern, img_name_with_extension) img_name = match.group(0) # 保存图片 with open(os.path.join(images_dir, img_name), 'wb') as f: f.write(response.content) downloaded_images.append([img_url, os.path.join(images_dir, img_name)]) except requests.exceptions.RequestException as e: print(f'请求图片时发生错误:{e}') except Exception as e: print(f'保存图片时发生错误:{e}') return downloaded_images # 如果文件夹存在则跳过 else: print(f'文章id为{card_id}的图片文件夹已经存在') return [] # 工具 转义标签 def transcoding_tags(self, htmlstr): re_img = re.compile(r'\s*\s*', re.M) s = re_img.sub(r'\n @@##\1##@@ \n', htmlstr) # IMG 转义 return s # 工具 转义标签 def translate_tags(self, htmlstr): re_img = re.compile(r'@@##(img.*?)##@@', re.M) s = re_img.sub(r'', htmlstr) # IMG 转义 return s # 清洗文章 def clean_content(self, content): if content is not None: content = re.sub(r'\r', r'\n', content) content = re.sub(r'\n{2,}', '', content) content = re.sub(r' {6,}', '', content) content = re.sub(r' {3,}\n', '', content) content = re.sub(r'', '', content) content = content.replace( ' ', '') content = content.replace( '''
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。