# duckdb，单机高效处理千万级以上数据

当你用pandas处理超过几百万甚至几千万上亿行的数据感觉太慢，又不太想搞pyspark，不妨试试duckdb.

它在本地单机运行，可以像spark那样使用sql语句进行数据分析和数据转换，并且性能非常高。

当处理几千万行以上的数据时，它的效率通常是pandas的几十几百倍。

难能可贵的是，这个库的用法非常简单，核心API只有以下几个。


```python
import duckdb

#输入建表：可以从parquet,pandas dataframe建表
tb = duckdb.read_parquet('input_data.parquet') 
tb = duckdb.from_df(df) #从pandas转换

#分析转换：执行sql，支持自定义函数UDF
tb2 = duckdb.sql('select * from tb  where val>1000 and val<10000  order by val')
duckdb.create_function('myfunc',myfunc,[duckdb.typing.VARCHAR],duckdb.typing.VARCHAR)  

#输出落表: 推荐导出成parquet格式，效率最高
tb2.to_parquet('output_data.parquet')
df2 = tb2.to_df() #也可以转换成pandas
```


In [4]:
!pip install duckdb  -i https://pypi.tuna.tsinghua.edu.cn/simple   

In [76]:
!pip install Faker  -i https://pypi.tuna.tsinghua.edu.cn/simple   

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Collecting Faker
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/a7/0b/57ac98ca6aa8cd54246b440bc2aa8549e5dcbb99a72a399cab6d66c7f198/Faker-26.0.0-py3-none-any.whl (1.8 MB)
[K     |████████████████████████████████| 1.8 MB 3.2 MB/s eta 0:00:01
Installing collected packages: Faker
Successfully installed Faker-26.0.0


## 一，输入建表

In [1]:
import numpy as np
import pandas as pd
import duckdb 

In [2]:
#生成示例数据(5000万行)
dfdemo = pd.DataFrame(
    {
        'category': np.random.choice(list('ABCDEF'), 50000000),
        'val': np.round(np.random.uniform(0, 1000000, 50000000), 3)
    }
)
dfdemo.to_parquet('dfdemo.parquet', index=False)


对比读取数据速度，duckdb比pandas快几十倍

In [3]:
%%time 
df = pd.read_parquet('dfdemo.parquet')

CPU times: user 2.48 s, sys: 497 ms, total: 2.98 s
Wall time: 2.08 s


In [4]:
%%time
tb = duckdb.read_parquet('dfdemo.parquet')

CPU times: user 1.99 ms, sys: 31.5 ms, total: 33.5 ms
Wall time: 77.1 ms


In [5]:
tb.columns 

['category', 'val']

## 二，SQL分析

### 1， 基本查询

#### where查询对比 (duckdb比pandas快20倍)

In [6]:
%%time 
# where 查询
duckdb.sql('select * from tb  where val>1000 and val<10000  order by val limit 5')

CPU times: user 496 µs, sys: 1.22 ms, total: 1.72 ms
Wall time: 20.4 ms


┌──────────┬──────────┐
│ category │   val    │
│ varchar  │  double  │
├──────────┼──────────┤
│ F        │ 1000.002 │
│ E        │ 1000.113 │
│ B        │ 1000.115 │
│ B        │ 1000.122 │
│ D        │ 1000.125 │
└──────────┴──────────┘

In [7]:
%%time 
df.loc[(df['val']>1000)&(df['val']<10000),:].sort_values('val').head(5)

CPU times: user 296 ms, sys: 105 ms, total: 401 ms
Wall time: 399 ms


Unnamed: 0,category,val
35699151,F,1000.002
37196599,E,1000.113
38865354,B,1000.115
14015201,B,1000.122
24024316,D,1000.125


#### groupby 操作对比 (duckdb比pandas快40倍)

In [8]:
%%time 
# groupby 操作
duckdb.sql('select category, count(val) as rows, mean(val) as avg_val from tb group by category')

CPU times: user 1.85 ms, sys: 17 ms, total: 18.8 ms
Wall time: 50.2 ms


┌──────────┬─────────┬────────────────────┐
│ category │  rows   │      avg_val       │
│ varchar  │  int64  │       double       │
├──────────┼─────────┼────────────────────┤
│ B        │ 8328359 │  500029.4997295229 │
│ A        │ 8334826 │ 499895.93226725736 │
│ D        │ 8329469 │   500127.047667208 │
│ E        │ 8334574 │  499913.2748367591 │
│ C        │ 8334322 │  499964.2243027494 │
│ F        │ 8338450 │ 500146.17542576144 │
└──────────┴─────────┴────────────────────┘

In [10]:
%%time 
df.groupby('category').agg({'val':['count','mean']})

CPU times: user 2.13 s, sys: 230 ms, total: 2.36 s
Wall time: 2.35 s


Unnamed: 0_level_0,val,val
Unnamed: 0_level_1,count,mean
category,Unnamed: 1_level_2,Unnamed: 2_level_2
A,8334826,499895.932267
B,8328359,500029.49973
C,8334322,499964.224303
D,8329469,500127.047667
E,8334574,499913.274837
F,8338450,500146.175426


#### join 操作对比 (duckdb比pandas快170倍)

In [12]:
dfcolor = pd.DataFrame(
    {
        'cat': ['A','B','C','D','E','F'],
        'color':['black','yellow','pink','blue','white','green'] 
    }
)

In [11]:
%%time
tb_color = duckdb.from_df(dfcolor)
duckdb.sql('select tb.category, tb.val, tb_color.color from tb join tb_color on tb.category = tb_color.cat')

CPU times: user 4.16 ms, sys: 17.7 ms, total: 21.9 ms
Wall time: 35.4 ms


┌──────────┬────────────┬─────────┐
│ category │    val     │  color  │
│ varchar  │   double   │ varchar │
├──────────┼────────────┼─────────┤
│ D        │ 869039.467 │ blue    │
│ B        │ 353716.066 │ yellow  │
│ C        │ 180675.829 │ pink    │
│ D        │ 964817.406 │ blue    │
│ E        │ 756984.249 │ white   │
│ C        │ 528946.308 │ pink    │
│ C        │ 513503.377 │ pink    │
│ F        │ 834033.897 │ green   │
│ E        │  919872.73 │ white   │
│ A        │ 699220.429 │ black   │
│ ·        │      ·     │   ·     │
│ ·        │      ·     │   ·     │
│ ·        │      ·     │   ·     │
│ A        │ 901283.968 │ black   │
│ C        │  942697.15 │ pink    │
│ A        │  780004.71 │ black   │
│ E        │ 207837.189 │ white   │
│ D        │ 403110.826 │ blue    │
│ A        │ 672348.467 │ black   │
│ D        │ 877300.687 │ blue    │
│ E        │ 337017.408 │ white   │
│ E        │ 762162.943 │ white   │
│ D        │ 297255.887 │ blue    │
├──────────┴────────────┴───

In [15]:
%%time
dfjoin = df.merge(dfcolor,left_on='category',right_on='cat')

CPU times: user 5.23 s, sys: 851 ms, total: 6.08 s
Wall time: 6.07 s


### 2，自定义函数UDF

pyspark的一个优点是可以在sql中使用注册自定义函数(UDF)，比较灵活。

那么duckdb支持在sql中使用注册自定义函数吗？ of course!

* 注册方法：使用create_function方法来注册一个Python函数作为UDF。需要提供函数名称、Python函数、参数类型和返回类型。

* 类型注解：如果Python函数有类型注解，可以省略parameters和return_type参数，DuckDB会根据注解自动推断。

* 空值处理：默认情况下，当UDF接收到NULL值时，会立即返回NULL。如果需要特殊处理，可以设置null_handling="special"。

* 异常处理：默认情况下，如果Python函数抛出异常，DuckDB会重新抛出该异常。如果希望改为返回null，可以设置exception_handling="return_null"。

* 副作用：如果UDF的结果受随机性影响，需要将side_effects设置为True。

* 使用Arrow：如果函数需要接收Arrow数组，设置type='arrow'。这会通知系统提供Arrow数组给函数，并期望函数返回相同数量的数组。

* 使用Native：当设置type='native'时，函数将按单个元组接收数据，并返回单个值。这适用于与不操作Arrow的Python库交互，如faker。



In [130]:
#先查看当前duckdb函数中带有 rand的函数
def get_rand_funs():
    query = """
    SELECT function_name,function_type,description,
        return_type,parameters,parameter_types,
        example 
    FROM duckdb_functions()  
    where function_name like '%rand%' 
    """
    df_funs = duckdb.sql(query).to_df()
    return df_funs

df_funs = get_rand_funs()
df_funs

Unnamed: 0,function_name,function_type,description,return_type,parameters,parameter_types,example
0,random,scalar,Returns a random number between 0 and 1,DOUBLE,[],[],random()
1,random_name,scalar,,VARCHAR,[col0],[BIGINT],
2,gen_random_uuid,scalar,Returns a random UUID similar to this: eeccb8c...,UUID,[],[],uuid()


In [136]:
import duckdb
from faker import Faker
def generate_random_name(i:int) -> str: 
    fake = Faker()
    fake.random.seed(i)
    
    #演示异常逻辑处理
    #if i%10==0:
    #    raise Exception('error') 
    
    name = fake.name()
    return name

#移除UDF
if 'random_name' in get_rand_funs()['function_name'].tolist():
    duckdb.remove_function('random_name')  

    
#注册UDF
duckdb.create_function("random_name", generate_random_name, 
                       [duckdb.typing.BIGINT], duckdb.typing.VARCHAR, 
                       exception_handling="return_null")


<duckdb.duckdb.DuckDBPyConnection at 0x7f9cf5d7e5f0>

In [138]:
tb_student = duckdb.sql("SELECT random_name(i) as name, cast(100*random() as INT) as score from range(100) tbl(i)")
tb_student 

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

┌─────────────────────┬───────┐
│        name         │ score │
│       varchar       │ int32 │
├─────────────────────┼───────┤
│ Norma Fisher        │    54 │
│ Ryan Gallagher      │     8 │
│ Theresa Brown       │    37 │
│ Joshua Wood         │    58 │
│ Brian Foster        │    93 │
│ Natalie Pope        │    62 │
│ Samantha Washington │    49 │
│ Chris Curtis        │    85 │
│ Victor Martinez     │    79 │
│ James Taylor        │    68 │
│      ·              │     · │
│      ·              │     · │
│      ·              │     · │
│ Brandon Keller      │     9 │
│ Christopher Sutton  │    67 │
│ Karl Grant          │    53 │
│ Kathleen Knight     │    23 │
│ Becky Ramirez       │    84 │
│ Kayla Osborn        │    35 │
│ Terry Foster        │    60 │
│ Robert Watson       │    48 │
│ Ronald Gomez        │    85 │
│ Curtis White        │    55 │
├─────────────────────┴───────┤
│     100 rows (20 shown)     │
└─────────────────────────────┘

In [139]:
pd.options.display.max_rows=1000
tb_student.to_df().head(15)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Unnamed: 0,name,score
0,Norma Fisher,19
1,Ryan Gallagher,40
2,Theresa Brown,36
3,Joshua Wood,50
4,Brian Foster,35
5,Natalie Pope,50
6,Samantha Washington,28
7,Chris Curtis,77
8,Victor Martinez,8
9,James Taylor,21


In [142]:
#再看看当前duckdb函数中带有 rand的函数，可以看到包括我们自定义的random_name了
get_rand_funs()


Unnamed: 0,function_name,function_type,description,return_type,parameters,parameter_types,example
0,random,scalar,Returns a random number between 0 and 1,DOUBLE,[],[],random()
1,random_name,scalar,,VARCHAR,[col0],[BIGINT],
2,gen_random_uuid,scalar,Returns a random UUID similar to this: eeccb8c...,UUID,[],[],uuid()


## 三，输出落表

In [144]:
tb_student.to_parquet('student.parquet')

## 