
超越 Pandas:7 种针对大型数据集的高级数据处理技术
图片由 Editor | ChatGPT 提供
引言
如果你使用 Python 处理过数据,很可能多次使用过 Pandas。这是有充分理由的;它直观、灵活,非常适合日常分析。但随着数据集开始增长,Pandas 开始暴露出它的局限性。可能是内存问题、性能迟缓,或者当你尝试对数百万行进行分组时,你的机器听起来就像要起飞一样。
这正是许多数据分析师和科学家开始提出相同问题的时候:还有什么其他的选择?
本文旨在回答这个问题。我们将探讨七种超越 Pandas 的工具和技术,它们专为更大的数据、更快的执行和更高效的管道而构建。读完本文,你将知道何时切换工具,选择什么工具,以及如何在自己的工作流程中实际使用它们。
1. 使用 Dask 进行并行化 DataFrame
Pandas 很好用,直到你处理的数据集不适合内存。一旦碰到这个障碍,事情很快就会变得一团糟。这就是 Dask 的用武之地。
Dask 的设计让 Pandas 用户感到熟悉,但在幕后,它将数据分成块并并行处理。它不会试图一次性加载所有内容。相反,它按需处理数据块,甚至可以将计算分布到多个核心或机器上。你无需重写整个工作流程即可获得巨大的性能提升。
Dask 的真正魔力来自三点:
- 惰性评估:Dask 不会立即运行操作。它会构建一个任务图,只在需要时执行,这意味着更少的浪费计算。
- 核外计算:通过从磁盘流式传输数据集,你可以处理大于 RAM 的数据集。
- 并行执行:过滤、连接和分组等操作可以在多个 CPU 核心上运行,而无需你付出额外的努力。
这是一个简单的示例,展示了如何加载大型 CSV 并运行分组操作,就像你在 Pandas 中会做的那样:
1 2 3 4 5 6 7 8 9 |
import dask.dataframe as dd # 默认情况下惰性加载数据 df = dd.read_csv("large_file.csv") # 按组计算平均值 result = df.groupby("category")["sales"].mean().compute() print(result) |
注意末尾的 .compute()
;那才是 Dask 实际触发计算的地方。在此之前,它只是在构建需要发生什么的“食谱”。
2. 使用 Polars 加速
如果 Dask 帮助你通过并行化拓宽范围,那么 Polars 则帮助你更快地前进。它不仅仅是为了快而快;它的速度快得像作弊一样。
Polars 是一个用 Rust 编写的 DataFrame 库,它利用 Apache Arrow 内存格式实现了惊人的加速。这意味着当你运行查询时,它会编译一个计划,在多个线程上运行它,并在 Pandas 甚至完成解析你的 CSV 之前交付结果。
但它的突出之处在于:Polars 不仅仅关乎速度。它还关乎管道思维。你不是编写逐行脚本;你构建一个转换链,只在需要时才执行。
让我们看一个使用 Polars 惰性 API 的例子。我们将读取一个大文件,过滤它,并运行一个 groupby
聚合,所有这些都无需一次性将所有内容加载到内存中:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import polars as pl # 惰性加载和转换 result = ( pl.read_csv("large_file.csv").lazy() .filter(pl.col("sales") > 1000) .groupby("region") .agg(pl.col("sales").mean().alias("avg_sales")) .sort("avg_sales", descending=True) .collect() # 触发执行 ) print(result) |
注意到末尾的 collect()
了吗?那就是所有内容运行的时刻。在此之前,你只是在构建一个计划。因为 Polars 会编译该计划,所以它运行得紧凑而干净,没有 Python 瓶颈,没有逐行迭代。
当你需要在单台机器上获得最大速度时,Polars 才能真正发挥作用。想想仪表板、分析管道或在将大型导出数据输入模型之前进行清理。如果你在使用 Pandas 时遇到性能瓶颈,并且不想启动集群或搞乱并行引擎,那么这就是你的捷径。
另外值得注意的是:语法很简洁。一旦你习惯了表达式思维,它会比编写循环和链式 Pandas 调用感觉更清晰。
3. 使用 Apache Arrow
有时速度不是关于更快地处理数字。有时是关于在不必移动数据时不要移动数据。这正是 Apache Arrow 的用武之地。
Arrow 通常不是你直接与之交互的工具,像 Pandas 或 Polars 那样;它更像是底层管道。但一旦你理解了它的作用,你就会开始明白它如何融入许多现代工作流程。
从核心来看,Arrow 是一种列式内存格式。这意味着它以列(而不是行)存储数据,并且其布局旨在实现速度,不仅在一种工具内部,而且在工具之间。因此,当 Pandas、Polars、PySpark 甚至数据库引擎都支持 Arrow 时,它们可以在彼此之间传递数据,而无需复制或转换。没有序列化,也没有等待。
让我们让它变得实用。假设你有一个 Pandas DataFrame,并希望将其移动到 Polars 以进行更快的 groupby
或将其导出,以便其他服务可以在不重新处理的情况下使用它。
这可能看起来像这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
import pandas as pd import pyarrow as pa import polars as pl # 创建示例 Pandas DataFrame df_pandas = pd.DataFrame({ "region": ["West", "East", "South", "West"], "sales": [100, 200, 150, 300] }) # 转换为 Arrow Table arrow_table = pa.Table.from_pandas(df_pandas) # 直接从 Arrow 加载到 Polars df_polars = pl.from_arrow(arrow_table) print(df_polars) |
那个小小的 from_arrow()
步骤跳过了你在转换格式时通常会遇到的所有开销。它是即时的、零拷贝的,并且内存高效。
当你关注边界性能时,使用 Arrow:在工具之间移动、读/写文件或连接到云服务。你不会直接使用它来分析数据,但当它在后台工作时,你的整个管道会运行得更流畅。
4. 使用 SQL 引擎查询(例如 DuckDB, SQLite)
有时,处理大数据最快的方法不是 Pandas、Polars 或 Spark;而是老式 SQL。这不仅仅是怀旧。SQL 仍然是运行分析查询最有效的方法之一,尤其是当你只需要过滤、分组或连接大型数据集而无需启动整个处理管道时。
DuckDB 就像 SQLite,但它针对分析工作负载进行了优化。它可以直接查询 Parquet 文件、CSV 或 Arrow 表,而无需将它们作为 DataFrame 加载到内存中。而且它完全在你的机器上运行,没有服务器,没有集群,无需设置。只需安装软件包并编写 SQL。
这是一个例子。假设你有一个很大的 Parquet 文件,并且想要找到每个类别的平均销售额,并按日期过滤:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import duckdb # 直接查询 Parquet 文件 query = """ SELECT category, AVG(sales) AS avg_sales FROM 'data/transactions.parquet' WHERE transaction_date >= '2023-01-01' GROUP BY category ORDER BY avg_sales DESC """ result = duckdb.query(query).to_df() print(result) |
无需先用 Pandas 加载文件;DuckDB 负责解析、过滤、分组,并且它做得很快。它还很聪明地将过滤器下推到文件级别(特别是对于 Parquet),因此它不会浪费时间扫描不需要的内容。
如果你在一个以 SQL 为通用语言的团队中工作,那么这是一种在扩展到完整数据库之前在本地进行原型设计的好方法。
何时使用 DuckDB:
- 你的数据对于 Pandas 来说太大了,但你还没有准备好使用 Spark
- 你想直接从磁盘(CSV、Parquet、Arrow)查询数据
- 你需要快速、可重复、基于 SQL 的分析,而无需设置基础设施
5. 使用 PySpark 进行分布式处理
在某个时候,你的数据集会变得如此之大,以至于它不仅仅是让你的机器变慢;它会使其崩溃。那时就是拿出 PySpark 的时候了。
PySpark 为你提供了 Apache Spark 的强大功能以及 Python 接口。它专为分布式计算而构建,这意味着它可以在多个核心或机器上处理数据,而无需你管理底层管道。如果你正在处理数据湖、数十亿行数据或生产级管道,那么这是你需要的工具。
Spark 提供了两种主要的工作方式:RDDs(弹性分布式数据集)和 DataFrames。除非你有非常具体的理由进行低级操作,否则请坚持使用 DataFrames。它们经过优化,更易于编写,如果你使用过 Pandas 或 SQL,它们会更熟悉。
让我们来看一个基本的 PySpark 管道,读取一个大型 CSV 文件,进行过滤和聚合:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
from pyspark.sql import SparkSession from pyspark.sql.functions import col, avg # 启动 Spark 会话 spark = SparkSession.builder.appName("SalesAnalysis").getOrCreate() # 读取大型 CSV 文件 df = spark.read.csv("large_sales.csv", header=True, inferSchema=True) # 过滤和分组 result = ( df.filter(col("region") == "West") .groupBy("category") .agg(avg("sales").alias("avg_sales")) .orderBy("avg_sales", ascending=False) ) # 显示结果 result.show() |
这看起来很像 Pandas,但执行方式完全不同。操作是惰性评估的,并在后台进行优化,然后才在整个集群(甚至在本地,如果你只是测试的话)上运行。
何时使用 PySpark:
- 你的数据无法在一台机器上容纳
- 你正在分布式环境(如云平台或本地集群)中工作
- 你的工作流程需要内置的容错、重试逻辑和可扩展性
需要注意的一点:PySpark 对于小型任务来说并非总是最快的选择。当规模和弹性比便利性或启动时间更重要时,它才能发挥作用。
6. 使用 Vaex 优化
如果你曾尝试将一个巨大的 CSV 文件加载到 Pandas 中,并眼睁睁看着你的 RAM 在几秒钟内消失,那么你并非孤单。大多数 DataFrame 工具都试图将所有内容保存在内存中,当处理亿万行或更多数据时,这根本无法扩展。
Vaex 是一个专门为高性能、单机分析而构建的 DataFrame 库。它速度快、内存效率高,并且以惰性方式构建,不是以一种不好的方式,而是以一种除非你真正需要,否则不要加载它的方式。
Vaex 的独特之处在于它使用内存映射文件。它不是将整个数据集加载到 RAM 中,而是按需直接从磁盘读取数据块。将其与其优化的后端(用 C++ 编写)结合起来,即使在大型数据集上,你也能获得闪电般的过滤、聚合和连接操作。
在实践中,这看起来像这样:
1 2 3 4 5 6 7 8 9 10 11 12 |
import vaex # 打开一个大型 CSV 文件(或 HDF5、Arrow、Parquet 等) df = vaex.open("big_data.csv") # 这可能超过 1 亿行 # 惰性过滤和分组 result = ( df[df.sales > 1000] .groupby("category", agg=vaex.agg.mean("sales")) ) print(result) |
就是这样;不需要 .compute()
调用。Vaex 在幕后找出最佳执行路径,并且只在必要时运行它。它还会缓存结果并最大限度地减少不必要的重新计算。
何时应该使用 Vaex?
- 你正在处理大型数据集(数千万或数亿行),但不想承担 Dask 或 Spark 的开销
- 你想要闪电般的分析式查询:过滤器、分箱、直方图、连接
- 你需要一种高效的工具,可以在单台机器上运行而不会耗尽所有 RAM
7. 使用生成器进行分块和流式处理
有时,最明智的做法是采用低级方法。当你的机器因文件而阻塞,并且所有工具都感觉太重时,Python 的内置工具仍然可以派上用场。在内存极度紧张的情况下——也许你正在嵌入式设备上工作,在 Lambda 函数中运行,或者只是试图避免将数百兆字节加载到 RAM 中——你甚至可能根本不需要 DataFrame。
生成器和文件迭代器允许你一次流式传输一行数据。你不会加载文件;你会在处理过程中对其进行处理。这意味着内存使用量保持恒定,没有峰值,并且完全控制接下来发生的事情。
这是一个简单的例子:逐行读取大型 CSV 文件,过滤行,并在处理过程中进行一些简单的转换:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
def process_csv(path): with open(path, "r") as f: header = next(f).strip().split(",") for line in f: row = line.strip().split(",") record = dict(zip(header, row)) # 示例转换 if float(record["sales"]) > 1000: yield { "region": record["region"], "sales": float(record["sales"]) * 1.1 # 应用加价 } # 用法 for item in process_csv("large_file.csv"): print(item) |
没有 Pandas。没有库。只是干净的迭代,完全可控。你什么时候应该走这条路?
- 你正在内存受限的环境中运行,例如云函数或边缘计算
- 你需要逐行流式传输数据,可能传输到另一个系统(例如数据库、API 或文件)
- 你想构建自己的轻量级 ETL 管道,而没有大型框架的开销
总结
本文概述的目的不是选出赢家。而是为任务选择正确的工具。
如果你想以最少的更改扩展你的 Pandas 代码,Dask 是一个可靠的第一步。对于单机上的纯速度,Polars 惊人的快。如果你关心互操作性或在系统之间传递数据,Arrow 可以保持简洁高效。喜欢 SQL?DuckDB 允许你对大型文件运行分析查询,而无需设置数据库。
处理海量数据集或企业级管道?那是 PySpark 的领域。无需离开你的笔记本电脑即可进行快速分析?Vaex 以 Pandas 的感觉为你提供内存映射性能。当你想保持简单和紧凑时,Python 自己的生成器可以逐行流式传输数据而无需费力。
有时,结合使用工具是“最佳”方法:将 Arrow 与 Polars 结合使用,或将文件加载到 DuckDB 中并导出到 Arrow 以进行管道中的下一步操作。你不必只在一个生态系统中。你也不需要完全放弃 Pandas。
重要的是了解你的选择,并构建一个与你的数据协同工作而非对抗的堆栈。
尝试一下。混合搭配。看看哪种方法最适合你。
暂无评论。