メインコンテンツへスキップ

概要

このガイドでは以下のことを学びます:
  • Stripeのブログ投稿をターゲットにしたクロールを開始する
  • クロールの進行状況を監視する
  • クロールしたコンテンツを取得して処理する

Stripeのブログページをクロールする

Stripeのブログページをクロールするには、特定のブログURLをターゲットにするためにパターンマッチングを使用してクロールエンドポイントを使用します。これにより、各ページの完全なHTMLコンテンツが取得され、必要な情報を抽出するために処理できます。
import requests
import time
import json
from datetime import datetime

# 設定
API_URL = 'https://api.olostep.com/v1'
API_KEY = '<your_olostep_api_key>'
HEADERS = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {API_KEY}'
}

# クロール期間追跡のために開始時間を記録
crawl_start_time = time.time()

print(f"[{datetime.now().strftime('%H:%M:%S')}] Stripeブログのクロールを開始します...")

# Stripeのエンジニアリングブログ投稿に焦点を当てたクロールを開始
# 特定の興味に基づいてパターンを調整できます
payload = {
    "start_url": "https://stripe.com/blog",
    "include_urls": ["/blog/engineering/**"],  # エンジニアリング投稿に焦点を当てる
    "max_pages": 25  # この例では25ページに制限
}

# クロールを開始
print("Stripeのエンジニアリングブログ投稿のクロールを開始します...")
response = requests.post(f'{API_URL}/crawls', headers=HEADERS, json=payload)
data = response.json()
crawl_id = data['id']
print(f"クロールがID: {crawl_id}で開始されました")

# クロールの進行状況を監視
while True:
    status_response = requests.get(f'{API_URL}/crawls/{crawl_id}', headers=HEADERS)
    status_data = status_response.json()
    print(f"クロールステータス: {status_data['status']} - クロールされたページ数: {status_data.get('pages_count', 0)}")
    
    if status_data['status'] == 'completed' or status_data['status'] == 'failed':
        break
        
    # 再確認する前に5秒待機
    time.sleep(5)

# クロール期間を計算して表示
crawl_duration = time.time() - crawl_start_time
print(f"[{datetime.now().strftime('%H:%M:%S')}] クロールが{crawl_duration:.2f}秒で完了しました")

ブログコンテンツをMarkdownに変換する

クロールしたコンテンツを利用する強力な方法の一つは、それをMarkdown形式に変換することです。これはLLMに入力したり、ナレッジベースを作成するのに理想的です。ブログコンテンツを取得してMarkdownに変換する方法を以下に示します:
import requests
import time
import json
from datetime import datetime
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

# 設定
API_URL = 'https://api.olostep.com/v1'
API_KEY = '<your_olostep_api_key>'
HEADERS = {
    'Content-Type': 'application/json',
    'Authorization': f'Bearer {API_KEY}'
}

# Markdown形式でコンテンツを取得する関数
def retrieve_content(retrieve_id, formats):
    params = {
        "retrieve_id": retrieve_id,
        "formats": json.dumps(formats)
    }
    response = requests.get(f"{API_URL}/retrieve", headers=HEADERS, params=params)
    return response.json()

# 前のクロール例から続行
if status_data['status'] == 'completed':
    print(f"\nクロール完了!{status_data['pages_count']}ページを取得しました。")
    pages_response = requests.get(f'{API_URL}/crawls/{crawl_id}/pages', headers=HEADERS)
    pages_data = pages_response.json()
    
    # 出力ディレクトリが存在しない場合は作成
    os.makedirs("output", exist_ok=True)
    
    # Markdownコンテンツを収集する準備
    markdown_pages = []
    total_pages = len(pages_data['pages'])
    
    # ページを並行して処理してMarkdownコンテンツを取得
    with ThreadPoolExecutor(max_workers=10) as executor:
        # コンテンツ取得のためのフューチャーを作成
        future_to_page = {
            executor.submit(retrieve_content, page['retrieve_id'], ["markdown"]): page
            for page in pages_data['pages']
        }
        
        # 完了した結果を処理
        for i, future in enumerate(as_completed(future_to_page), 1):
            page = future_to_page[future]
            url = page['url']
            print(f"処理中 {i}/{total_pages}: {url}")
            
            try:
                content_data = future.result()
                if content_data and "markdown_content" in content_data:
                    markdown_pages.append({
                        'url': url,
                        'title': page['title'],
                        'markdown_content': content_data['markdown_content']
                    })
                    print(f"✓ {url}のMarkdownコンテンツを取得しました")
                else:
                    print(f"⚠ {url}のMarkdownコンテンツがありません")
            except Exception as e:
                print(f"❌ {url}のコンテンツ取得エラー: {str(e)}")
    
    # すべてのMarkdownコンテンツを単一ファイルに保存
    output_file = "output/stripe_blog_markdown.md"
    
    with open(output_file, "w", encoding="utf-8") as f:
        for page in markdown_pages:
            # タイトルとURLを含むページヘッダーを書き込み
            f.write(f"URL: {page['url']}\n\n")
            
            # Markdownコンテンツを書き込み
            f.write(f"{page['markdown_content']}\n\n")
            
            # ページ間の区切りを追加
            f.write("---\n\n")
            
            print(f"✓ {page['url']}からのMarkdownコンテンツを追加しました")

    print(f"\n✅ 処理完了!すべてのMarkdownコンテンツが'{output_file}'に保存されました")
    print(f"処理されたページ数: {len(markdown_pages)}")
else:
    print(f"クロールがステータス: {status_data['status']}で失敗しました")

Markdown出力例

生成されたMarkdownファイルには、クロールされたブログコンテンツがクリーンで構造化された形式で含まれます:
URL: https://stripe.com/blog/using-ml-to-detect-and-respond-to-performance-degradations

## Using ML to detect and respond to performance degradations

By Jane Smith, Senior Engineer at Stripe

At Stripe, we process millions of API requests every day...

---

URL: https://stripe.com/blog/building-robust-payment-systems

## Building a robust payment system

By John Doe, Engineering Manager

Reliability is at the core of Stripe's infrastructure...

---

次のステップ

Stripeのブログからコンテンツをクロールして抽出することに成功したので、次のことができます:
  1. クロールを拡張する: include_urlsパラメータを変更してStripeのブログの他のセクションをクロールする
  2. 定期的な更新を実施する: 新しいコンテンツを定期的にクロールするためのスケジュールされたジョブを設定する
  3. より深い分析を行う: NLPツールを使用してブログコンテンツから洞察を抽出する
  4. 検索エンジンを構築する: Stripeのブログコンテンツの検索可能なデータベースを作成する
  5. LLMにフィードする: MarkdownコンテンツをLLMに入力してStripeのエンジニアリングプラクティスに関する質問に答える
Olostepのコンテンツクロール機能を使用して、任意のウェブサイトのコンテンツ戦略を監視および分析するための強力なツールを構築できます。