跳转到主要内容

概述

本指南将向你展示如何:
  • 启动一个专门针对 Stripe 博客文章的爬取
  • 监控爬取进度
  • 检索并处理爬取的内容

爬取 Stripe 的博客页面

要爬取 Stripe 的博客页面,使用爬取端点并通过模式匹配来定位特定的博客 URL。这将获取每个页面的完整 HTML 内容,然后你可以处理这些内容以提取所需的信息。
import requests
import time
import json
from datetime import datetime

# 配置
API_URL = 'https://api.olostep.com/v1'
API_KEY = '<your_olostep_api_key>'
HEADERS = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {API_KEY}'
}

# 记录爬取开始时间以跟踪持续时间
crawl_start_time = time.time()

print(f"[{datetime.now().strftime('%H:%M:%S')}] 开始爬取 Stripe 博客...")

# 开始一个专注于 Stripe 工程博客文章的爬取
# 你可以根据具体兴趣调整模式
payload = {
    "start_url": "https://stripe.com/blog",
    "include_urls": ["/blog/engineering/**"],  # 专注于工程文章
    "max_pages": 25  # 此示例限制为 25 页
}

# 开始爬取
print("开始爬取 Stripe 的工程博客文章...")
response = requests.post(f'{API_URL}/crawls', headers=HEADERS, json=payload)
data = response.json()
crawl_id = data['id']
print(f"爬取已启动,ID: {crawl_id}")

# 监控爬取进度
while True:
    status_response = requests.get(f'{API_URL}/crawls/{crawl_id}', headers=HEADERS)
    status_data = status_response.json()
    print(f"爬取状态: {status_data['status']} - 已爬取页面: {status_data.get('pages_count', 0)}")
    
    if status_data['status'] == 'completed' or status_data['status'] == 'failed':
        break
        
    # 等待 5 秒后再次检查
    time.sleep(5)

# 计算并显示爬取持续时间
crawl_duration = time.time() - crawl_start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] 爬取在 {crawl_duration:.2f} 秒内完成")

将博客内容转换为 Markdown

使用爬取的内容的一个强大方式是将其转换为 markdown 格式,这对于输入到 LLMs 或创建知识库非常理想。以下是如何检索并将博客内容转换为 markdown:
import requests
import time
import json
from datetime import datetime
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

# 配置
API_URL = 'https://api.olostep.com/v1'
API_KEY = '<your_olostep_api_key>'
HEADERS = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {API_KEY}'
}

# 函数:以 markdown 格式检索内容
def retrieve_content(retrieve_id, formats):
    params = {
        "retrieve_id": retrieve_id,
        "formats": json.dumps(formats)
    }
    response = requests.get(f"{API_URL}/retrieve", headers=HEADERS, params=params)
    return response.json()

# 继续前面的爬取示例
if status_data['status'] == 'completed':
    print(f"\n爬取完成!已检索 {status_data['pages_count']} 页。")
    pages_response = requests.get(f'{API_URL}/crawls/{crawl_id}/pages', headers=HEADERS)
    pages_data = pages_response.json()
    
    # 如果输出目录不存在则创建
    os.makedirs("output", exist_ok=True)
    
    # 准备收集 markdown 内容
    markdown_pages = []
    total_pages = len(pages_data['pages'])
    
    # 并行处理页面以获取 markdown 内容
    with ThreadPoolExecutor(max_workers=10) as executor:
        # 为内容检索创建 future
        future_to_page = {
            executor.submit(retrieve_content, page['retrieve_id'], ["markdown"]): page
            for page in pages_data['pages']
        }
        
        # 处理完成的结果
        for i, future in enumerate(as_completed(future_to_page), 1):
            page = future_to_page[future]
            url = page['url']
            print(f"处理 {i}/{total_pages}: {url}")
            
            try:
                content_data = future.result()
                if content_data and "markdown_content" in content_data:
                    markdown_pages.append({
                        'url': url,
                        'title': page['title'],
                        'markdown_content': content_data['markdown_content']
                    })
                    print(f"✓ 已检索 {url} 的 markdown 内容")
                else:
                    print(f"⚠ {url} 没有 markdown 内容")
            except Exception as e:
                print(f"❌ 检索 {url} 内容时出错: {str(e)}")
    
    # 将所有 markdown 内容保存到单个文件
    output_file = "output/stripe_blog_markdown.md"
    
    with open(output_file, "w", encoding="utf-8") as f:
        for page in markdown_pages:
            # 写入页面标题和 URL
            f.write(f"URL: {page['url']}\n\n")
            
            # 写入 markdown 内容
            f.write(f"{page['markdown_content']}\n\n")
            
            # 添加页面之间的分隔符
            f.write("---\n\n")
            
            print(f"✓ 已添加来自 {page['url']} 的 markdown 内容")

    print(f"\n✅ 处理完成!所有 markdown 内容已保存到 '{output_file}'")
    print(f"处理的总页面数: {len(markdown_pages)}")
else:
    print(f"爬取失败,状态: {status_data['status']}")

示例 Markdown 输出

生成的 markdown 文件将包含所有爬取的博客内容,格式整洁且结构化:
URL: https://stripe.com/blog/using-ml-to-detect-and-respond-to-performance-degradations

## 使用机器学习检测和响应性能下降

作者:Jane Smith,Stripe 高级工程师

在 Stripe,我们每天处理数百万的 API 请求...

---

URL: https://stripe.com/blog/building-robust-payment-systems

## 构建稳健的支付系统

作者:John Doe,工程经理

可靠性是 Stripe 基础设施的核心...

---

下一步

现在你已经成功爬取并提取了 Stripe 博客的内容,你可以:
  1. 扩展你的爬取:修改 include_urls 参数以爬取 Stripe 博客的其他部分
  2. 实现定期更新:设置定时任务以定期爬取新内容
  3. 进行深入分析:使用 NLP 工具从博客内容中提取见解
  4. 构建搜索引擎:创建一个可搜索的 Stripe 博客内容数据库
  5. 输入到 LLMs:使用 markdown 内容作为上下文,让 LLMs 回答关于 Stripe 工程实践的问题
利用 Olostep 的内容爬取能力,你可以构建强大的工具来监控和分析任何网站的内容策略。