Poolboy
如果不控制好程序创建的最大并行进程数,系统资源很容易就会耗尽。Poolboy 就是为了解决这个问题,在 Erlang 下被广泛使用的轻量级,通用进程池程序库。
为什么需要 Poolboy?
让我们来考虑一下这个例子。你的任务是打造一个保存用户资料到数据库的应用。如果你为每个用户的注册操作都创建一个进程,最终可能会导致大量的数据库连接被创建出来。到了某一时刻,它可能就超过了数据库的承载能力。最终,你的应用也会抛出连接超时,和其它各种异常。
解决的方案是,使用一组 worker 进程来限制数据库连接,而不是为每一个用户注册操作创建一个进程。这样就能避免耗尽系统的资源。
Poolboy 就是为此产生。它允许你设置一个受 Supervisor
管理的 worker 进程池,并且还不需要花费你太多的精力去管理。很多程序库的底层都使用了 Poolboy。比如,postgrex
的连接池管理 *(Ecto 连接 PostgreSQL 时使用的程序库)*和 redis_poolex
(Redis 连接池) 都是出名的,使用了 Poolboy 的程序库。
安装
通过 mix,安装简直易如反掌。我们需要做的就是把 Poolboy 添加到 mix.exs
的依赖配置里面。
让我们先来创建一个应用:
$ mix new poolboy_app --sup
把 Poolboy 添加到 mix.exs
的依赖配置里面。
defp deps do
[{:poolboy, "~> 1.5.1"}]
end
然后安装获取依赖。
$ mix deps.get
配置选项
使用 Poolboy 前,我们还是要了解一些它的配置选项。
:name
- 进程池名字。命名空间(Scope)可以是:local
,:global
或者:via
。:worker_module
- 代表 worker 进程的模块。:size
- 最大进程数。:max_overflow
- 当进程池为空的时候,可创造的最大临时进程数。(可选):strategy
-:lifo
或:fifo
,决定了回收到进程池的 worker 进程,是放到可用 worker 进程队列的开头还是结尾。默认值为:lifo
。(可选)
开始配置 Poolboy
以下的例子,我们会创建一个负责处理计算平方根请求的 worker 进程池。样例会尽量简单以便于我们关注在 Poolboy 上为主。
让我们先配置 Poolboy,并把 Poolboy worker 进程池添加到我们的应用中作为一个子进程。编辑 lib/poolboy_app/application.ex
如下:
defmodule PoolboyApp.Application do
@moduledoc false
use Application
defp poolboy_config do
[
{:name, {:local, :worker}},
{:worker_module, PoolboyApp.Worker},
{:size, 5},
{:max_overflow, 2}
]
end
def start(_type, _args) do
children = [
:poolboy.child_spec(:worker, poolboy_config())
]
opts = [strategy: :one_for_one, name: PoolboyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
最开始定义的是进程池的配置选项。进程池的名字设置为 :worker
,:scope
设为 :local
。然后我们指派 PoolboyApp.Worker
模块作为 :worker_module
。进程池的大小通过 :size
设置为 5。同时,通过配置 :max_overflow
选项,我们还可以让进程池在 worker 进程繁忙的情况下,最多创建两个额外的 worker。(overflow
workers 完成工作后会被销毁。)
然后,我们把 :poolboy.child_spec/2
函数添加到 children 数组中,它就会随着应用的启动而启动。这个函数接收两个参数:进程池名字,和它的配置。
创建 Worker
Worker 模块只是一个简单的 GenServer
。它计算平方根,sleep 一秒,然后打印出 worker 的 pid。lib/poolboy_app/worker.ex
文件如下:
defmodule PoolboyApp.Worker do
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil, [])
end
def init(_) do
{:ok, nil}
end
def handle_call({:square_root, x}, _from, state) do
IO.puts("process #{inspect(self())} calculating square root of #{x}")
:timer.sleep(1000)
{:reply, :math.sqrt(x), state}
end
end
Poolboy 的使用
既然我们已经有了 PoolboyApp.Worker
,我们就可以测试 Poolboy 了。我们先创建一个简单的,使用 Poolboy 创建并发进程的模块。:poolboy.transaction/3
是可以和 worker 进程池交互的函数。测试文件 lib/poolboy_app/test.ex
如下:
defmodule PoolboyApp.Test do
@timeout 60000
def start do
1..20
|> Enum.map(fn i -> async_call_square_root(i) end)
|> Enum.each(fn task -> await_and_inspect(task) end)
end
defp async_call_square_root(i) do
Task.async(fn ->
:poolboy.transaction(
:worker,
fn pid -> GenServer.call(pid, {:square_root, i}) end,
@timeout
)
end)
end
defp await_and_inspect(task), do: task |> Task.await(@timeout) |> IO.inspect()
end
运行测试,结果如下:
$ iex -S mix
iex> PoolboyApp.Test.start()
process #PID<0.182.0> calculating square root of 7
process #PID<0.181.0> calculating square root of 6
process #PID<0.157.0> calculating square root of 2
process #PID<0.155.0> calculating square root of 4
process #PID<0.154.0> calculating square root of 5
process #PID<0.158.0> calculating square root of 1
process #PID<0.156.0> calculating square root of 3
...
如果进程池已经把 worker 进程耗尽了,Poolboy 就会在默认的超时时间(5秒)后丢出超时错误,并不再接收任何新的请求。我们的这个例子,已经把默认的超时时间改成一分钟,就是为了展示如何更改默认的超时时间。你如果把 @timeout
修改到小于 1000,就能观察到错误。
即便我们尝试创建更多的进程 (上面的例子是 20 ),:poolboy.transaction/3
函数还是会限制最大的进程数为 5 (有需要的话会加上两个 overflow worker)。所有的请求都会通过进程池里面的 worker 来处理,而不会为每个新的请求创建新的进程。
最后更新于
这有帮助吗?