本页介绍 spider 系统的高级能力:并发控制、暂停 / 恢复、流式输出、生命周期钩子、统计信息以及日志。
Spider 系统通过三个类属性控制爬取的激进程度:
| 属性 | 默认值 | 说明 |
|---|---|---|
concurrent_requests | 4 | 同时处理的最大请求数 |
concurrent_requests_per_domain | 0 | 每个域名的最大并发请求数(0 = 不限制) |
download_delay | 0.0 | 每次请求前等待的秒数 |
robots_txt_obey | False | 是否遵守 robots.txt 规则(Disallow、Crawl-delay、Request-rate) |
class PoliteSpider(Spider): name = "polite" start_urls = ["https://example.com"]
# 对服务器更友好一些 concurrent_requests = 4 concurrent_requests_per_domain = 2 download_delay = 1.0 # 每次请求之间等待 1 秒
async def parse(self, response: Response): yield {"title": response.css("title::text").get("")}当设置了 concurrent_requests_per_domain 后,每个域名除了会受到全局并发限制外,还会拥有各自独立的并发限制器。这个特性很适合同时抓取多个域名的场景:你可以维持较高的全局并发,同时又不会对某个单独域名过于激进。
使用 uvloop
Section titled “使用 uvloop”start() 方法支持 use_uvloop 参数。如果环境中可用,它会使用更快的 uvloop / winloop 事件循环实现:
result = MySpider().start(use_uvloop=True)对于 I/O 密集型爬取任务,这通常能提升吞吐量。你需要自行额外安装 uvloop(Linux/macOS)或 winloop(Windows)。
Spider 支持通过 checkpoint 进行优雅的暂停与恢复。要启用它,只需在构造 spider 时传入 crawldir 目录:
spider = MySpider(crawldir="crawl_data/my_spider")result = spider.start()
if result.paused: print("爬取已暂停。再次运行即可恢复。")else: print("爬取完成!")- 暂停:爬取过程中按下
Ctrl+C。Spider 会等待所有进行中的请求完成,保存 checkpoint(待处理请求 + 已见请求指纹集合),然后退出。 - 强制停止:再次按下
Ctrl+C,会立刻停止,不再等待活动任务完成。 - 恢复:使用相同的
crawldir再次运行 spider。它会检测 checkpoint,恢复队列和 seen 集合,并从上次中断处继续,同时跳过start_requests()。 - 清理:当一次爬取正常完成(而不是暂停)时,checkpoint 文件会自动删除。
此外,checkpoint 还会在爬取过程中定期自动保存(默认每 5 分钟一次)。
你可以这样修改保存间隔:
# 每 2 分钟保存一次 checkpointspider = MySpider(crawldir="crawl_data/my_spider", interval=120.0)写盘过程是原子的,因此非常安全。
如何知道当前是在恢复运行
Section titled “如何知道当前是在恢复运行”on_start() 钩子会收到一个 resuming 标志:
async def on_start(self, resuming: bool = False): if resuming: self.logger.info("Resuming from checkpoint!") else: self.logger.info("Starting fresh crawl")当你反复调试 spider 的 parse() 逻辑时,每次运行都重新请求目标服务器既慢又吵。开发模式会在第一次运行时把每个响应缓存到磁盘,后续运行则直接从磁盘回放,这样你就可以反复调整选择器并重新运行 spider,而无需再发出任何网络请求。
只需在 spider 上设置 development_mode = True:
class MySpider(Spider): name = "my_spider" start_urls = ["https://example.com"] development_mode = True
async def parse(self, response: Response): yield {"title": response.css("title::text").get("")}第一次运行会正常抓取,并将每个响应保存到磁盘。此后每次运行都会直接从缓存中返回相同请求的结果,完全跳过网络层。
默认情况下,响应会被缓存到当前工作目录下的 .scrapling_cache/{spider.name}/ 中。这里说的“当前工作目录”,指的是你运行 spider 时所在的位置,而不是 spider 脚本文件所在目录。
你也可以通过 development_cache_dir 覆盖缓存位置:
class MySpider(Spider): name = "my_spider" start_urls = ["https://example.com"] development_mode = True development_cache_dir = "/tmp/my_spider_cache"- 缓存键:每个响应都以请求的 fingerprint 为键,因此只要你修改了会影响指纹的属性(
fp_include_kwargs、fp_include_headers、fp_keep_fragments),就会触发一次新的抓取。 - 存储格式:每个响应对应一个 JSON 文件,命名格式为
{fingerprint_hex}.json。响应体会以 base64 编码保存,因此二进制内容也能被原样保留。写入过程为原子写入(临时文件 + rename)。 - 回放:命中缓存时,引擎会完全跳过网络层,包括
download_delay、速率限制以及is_blocked()的重试逻辑。缓存响应会直接传给你的回调。 - 统计:缓存请求仍会计入
requests_count、response_bytes和各状态码计数,因此统计输出与正常爬取保持一致。额外还有两个计数器:cache_hits与cache_misses,方便你查看缓存命中情况。
缓存不会自动过期。如果你想强制重新抓取,请删除缓存目录,或者直接调用 manager 的 clear() 方法。
对于长时间运行的 spider,或者需要实时获取抓取结果的应用,请使用 stream(),而不是 start():
import anyio
async def main(): spider = MySpider() async for item in spider.stream(): print(f"Got item: {item}") # 读取实时统计信息 print(f"Items so far: {spider.stats.items_scraped}") print(f"Requests made: {spider.stats.requests_count}")
anyio.run(main)与 start() 的主要区别:
stream()必须在异步上下文中调用- 条目会在抓取完成后逐个 yield,而不是最后统一收集成列表
- 你可以在迭代过程中通过
spider.stats读取实时统计信息
它也可以与 checkpoint 系统一起使用,因此很适合基于 spider 构建 UI——例如能实时显示数据,并支持暂停 / 恢复的界面。
import anyio
async def main(): spider = MySpider(crawldir="crawl_data/my_spider") async for item in spider.stream(): print(f"Got item: {item}") # 读取实时统计信息 print(f"Items so far: {spider.stats.items_scraped}") print(f"Requests made: {spider.stats.requests_count}")
anyio.run(main)你也可以在上述代码中调用 spider.pause() 来关闭 spider。如果没有启用 checkpoint 系统,它就只会单纯结束本次爬取。
生命周期钩子
Section titled “生命周期钩子”Spider 提供了多个可重写钩子,让你能够在爬取不同阶段注入自定义行为。
on_start
Section titled “on_start”在爬取开始前调用。适合做初始化任务,例如加载数据或准备资源:
async def on_start(self, resuming: bool = False): self.logger.info("Spider starting up") # 从数据库加载种子 URL、初始化计数器等on_close
Section titled “on_close”在爬取结束后调用(无论是正常完成还是暂停)。适合做清理工作:
async def on_close(self): self.logger.info("Spider shutting down") # 关闭数据库连接、刷新缓冲区等on_error
Section titled “on_error”当某个请求因异常失败时调用。可用于错误跟踪或自定义恢复逻辑:
async def on_error(self, request: Request, error: Exception): self.logger.error(f"Failed: {request.url} - {error}") # 上报到错误跟踪系统、保存失败 URL 以便稍后重试等on_scraped_item
Section titled “on_scraped_item”每抓取到一个条目、且在它被加入最终结果前调用。返回该条目(无论是否修改)表示保留;返回 None 则表示丢弃:
async def on_scraped_item(self, item: dict) -> dict | None: # 丢弃没有标题的条目 if not item.get("title"): return None
# 修改条目(例如增加时间戳) item["scraped_at"] = "2026-01-01" return itemstart_requests
Section titled “start_requests”如果你想自定义初始请求生成逻辑,而不是依赖 start_urls,可以重写 start_requests():
async def start_requests(self): # 先发送一个 POST 登录请求 yield Request( "https://example.com/login", method="POST", data={"user": "admin", "pass": "secret"}, callback=self.after_login, )
async def after_login(self, response: Response): # 然后再抓取登录后的页面 yield response.follow("/dashboard", callback=self.parse)结果与统计信息
Section titled “结果与统计信息”start() 返回的 CrawlResult 同时包含抓取结果和详细统计信息:
result = MySpider().start()
# 条目print(f"Total items: {len(result.items)}")result.items.to_json("output.json", indent=True)
# 是否完成print(f"Completed: {result.completed}")print(f"Paused: {result.paused}")
# 统计信息stats = result.statsprint(f"Requests: {stats.requests_count}")print(f"Failed: {stats.failed_requests_count}")print(f"Blocked: {stats.blocked_requests_count}")print(f"Offsite filtered: {stats.offsite_requests_count}")print(f"Robots.txt disallowed: {stats.robots_disallowed_count}")print(f"Cache hits: {stats.cache_hits}")print(f"Cache misses: {stats.cache_misses}")print(f"Items scraped: {stats.items_scraped}")print(f"Items dropped: {stats.items_dropped}")print(f"Response bytes: {stats.response_bytes}")print(f"Duration: {stats.elapsed_seconds:.1f}s")print(f"Speed: {stats.requests_per_second:.1f} req/s")CrawlStats 对象会跟踪更细粒度的信息:
stats = result.stats
# 状态码分布print(stats.response_status_count)# {'status_200': 150, 'status_404': 3, 'status_403': 1}
# 各域名下载的字节数print(stats.domains_response_bytes)# {'example.com': 1234567, 'api.example.com': 45678}
# 各会话的请求次数print(stats.sessions_requests_count)# {'http': 120, 'stealth': 34}
# 本次爬取用到的代理print(stats.proxies)# ['http://proxy1:8080', 'http://proxy2:8080']
# 各日志级别计数print(stats.log_levels_counter)# {'debug': 200, 'info': 50, 'warning': 3, 'error': 1, 'critical': 0}
# 时间信息print(stats.start_time) # 爬取开始时的 Unix 时间戳print(stats.end_time) # 爬取结束时的 Unix 时间戳print(stats.download_delay) # 实际使用的下载延迟(秒)
# 实际使用的并发配置print(stats.concurrent_requests) # 全局并发限制print(stats.concurrent_requests_per_domain) # 每域名并发限制
# 自定义统计(由你的 spider 代码写入)print(stats.custom_stats)# {'login_attempts': 3, 'pages_with_errors': 5}
# 导出完整字典print(stats.to_dict())Spider 内置了一个可通过 self.logger 访问的日志器。它已按 spider 名称预先配置,并支持以下自定义选项:
| 属性 | 默认值 | 说明 |
|---|---|---|
logging_level | logging.DEBUG | 最低日志级别 |
logging_format | "[%(asctime)s]:({spider_name}) %(levelname)s: %(message)s" | 日志消息格式 |
logging_date_format | "%Y-%m-%d %H:%M:%S" | 日志消息中的日期格式 |
log_file | None | 日志文件路径(会在控制台输出之外额外写入文件) |
import logging
class MySpider(Spider): name = "my_spider" start_urls = ["https://example.com"] logging_level = logging.INFO log_file = "logs/my_spider.log"
async def parse(self, response: Response): self.logger.info(f"Processing {response.url}") yield {"title": response.css("title::text").get("")}如果日志目录不存在,系统会自动创建。控制台输出和文件输出会使用同一套格式。