概述
本指南将向你展示如何:- 开始一个专门针对Stripe博客文章的爬取
- 监控爬取进度
- 检索和处理爬取的内容
爬取Stripe的博客页面
要爬取Stripe的博客页面,使用带有模式匹配的爬取端点来定位特定的博客URL。这将获取每个页面的完整HTML内容,然后你可以处理这些内容以提取所需的信息。import requests
import time
import json
from datetime import datetime
# 配置
API_URL = 'https://api.olostep.com/v1'
API_KEY = '<your_olostep_api_key>'
HEADERS = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {API_KEY}'
}
# 记录开始时间以跟踪爬取持续时间
crawl_start_time = time.time()
print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始爬取Stripe博客...")
# 开始一个专注于Stripe工程博客文章的爬取
# 你可以根据具体兴趣调整模式
payload = {
"start_url": "https://stripe.com/blog",
"include_urls": ["/blog/engineering/**"], # 专注于工程文章
"max_pages": 25 # 此示例限制为25页
}
# 开始爬取
print("开始爬取Stripe的工程博客文章...")
response = requests.post(f'{API_URL}/crawls', headers=HEADERS, json=payload)
data = response.json()
crawl_id = data['id']
print(f"爬取已开始,ID: {crawl_id}")
# 监控爬取进度
while True:
status_response = requests.get(f'{API_URL}/crawls/{crawl_id}', headers=HEADERS)
status_data = status_response.json()
print(f"爬取状态: {status_data['status']} - 已爬取页面: {status_data.get('pages_count', 0)}")
if status_data['status'] == 'completed' or status_data['status'] == 'failed':
break
# 等待5秒后再次检查
time.sleep(5)
# 计算并显示爬取持续时间
crawl_duration = time.time() - crawl_start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] 爬取在 {crawl_duration:.2f} 秒内完成")
将博客内容转换为Markdown
使用爬取的内容的一种强大方式是将其转换为markdown格式,这非常适合用于输入到LLMs或创建知识库。以下是如何检索和转换博客内容为markdown:import requests
import time
import json
from datetime import datetime
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
# 配置
API_URL = 'https://api.olostep.com/v1'
API_KEY = '<your_olostep_api_key>'
HEADERS = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {API_KEY}'
}
# 函数以markdown格式检索内容
def retrieve_content(retrieve_id, formats):
params = {
"retrieve_id": retrieve_id,
"formats": json.dumps(formats)
}
response = requests.get(f"{API_URL}/retrieve", headers=HEADERS, params=params)
return response.json()
# 继续之前的爬取示例
if status_data['status'] == 'completed':
print(f"\n爬取完成!已检索 {status_data['pages_count']} 页。")
pages_response = requests.get(f'{API_URL}/crawls/{crawl_id}/pages', headers=HEADERS)
pages_data = pages_response.json()
# 如果输出目录不存在则创建
os.makedirs("output", exist_ok=True)
# 准备收集markdown内容
markdown_pages = []
total_pages = len(pages_data['pages'])
# 并行处理页面以获取markdown内容
with ThreadPoolExecutor(max_workers=10) as executor:
# 创建内容检索的期物
future_to_page = {
executor.submit(retrieve_content, page['retrieve_id'], ["markdown"]): page
for page in pages_data['pages']
}
# 处理完成的结果
for i, future in enumerate(as_completed(future_to_page), 1):
page = future_to_page[future]
url = page['url']
print(f"处理 {i}/{total_pages}: {url}")
try:
content_data = future.result()
if content_data and "markdown_content" in content_data:
markdown_pages.append({
'url': url,
'title': page['title'],
'markdown_content': content_data['markdown_content']
})
print(f"✓ 已检索 {url} 的markdown内容")
else:
print(f"⚠ {url} 没有markdown内容")
except Exception as e:
print(f"❌ 检索 {url} 内容时出错: {str(e)}")
# 将所有markdown内容保存到一个文件中
output_file = "output/stripe_blog_markdown.md"
with open(output_file, "w", encoding="utf-8") as f:
for page in markdown_pages:
# 写入页面标题和URL
f.write(f"URL: {page['url']}\n\n")
# 写入markdown内容
f.write(f"{page['markdown_content']}\n\n")
# 在页面之间添加分隔符
f.write("---\n\n")
print(f"✓ 已添加 {page['url']} 的markdown内容")
print(f"\n✅ 处理完成!所有markdown内容已保存到 '{output_file}'")
print(f"处理的总页数: {len(markdown_pages)}")
else:
print(f"爬取失败,状态: {status_data['status']}")
示例Markdown输出
生成的markdown文件将包含所有爬取的博客内容,格式干净且结构化:URL: https://stripe.com/blog/using-ml-to-detect-and-respond-to-performance-degradations
## 使用机器学习检测和响应性能下降
作者:Jane Smith,Stripe高级工程师
在Stripe,我们每天处理数百万的API请求...
---
URL: https://stripe.com/blog/building-robust-payment-systems
## 构建强大的支付系统
作者:John Doe,工程经理
可靠性是Stripe基础设施的核心...
---
下一步
现在你已经成功爬取并提取了Stripe博客的内容,你可以:- 扩展你的爬取:修改
include_urls参数以爬取Stripe博客的其他部分 - 实现定期更新:设置一个定时任务以定期爬取新内容
- 进行更深入的分析:使用NLP工具从博客内容中提取见解
- 构建搜索引擎:创建一个可搜索的Stripe博客内容数据库
- 输入到LLMs:使用markdown内容作为上下文,让LLMs回答关于Stripe工程实践的问题