-- 自行构建 index 插入 table, 比 table.insert 效率高 function _M.insert_tail(tab, ...) local idx = #tab -- 遍历输入的参数 for i = 1, select('#', ...) do idx = idx + 1 tab[idx] = select(i, ...) end
return idx end
select('#', ...) 获取输入参数的数量,select(i, ...) 获取第 n 个参数,Table 的遍历中大量使用该结构。
function _M.try_read_attr(tab, ...) for i = 1, select('#', ...) do local attr = select(i, ...) if type(tab) ~= "table" then return nil end
tab = tab[attr] end
return tab end
使用示例:
1 2 3 4
local size = core_tab.try_read_attr(local_conf, "graphql", "max_size") if size then max_size = size end
4. 工具类
APISIX 封装了许多工具类,这些工具共同组成了 APISIX 的 PDK(Plugin Development Kit),利用这些方法,插件开发能够增速许多。
4.1. JSON 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
local delay_tab = setmetatable({data = "", force = false}, { __tostring = function(self) local res, err = encode(self.data, self.force) if not res then ngx.log(ngx.WARN, "failed to encode: ", err, " force: ", self.force) end
return res end })
-- this is a non-thread safe implementation -- it works well with log, eg: log.info(..., json.delay_encode({...})) function _M.delay_encode(data, force) delay_tab.data = data delay_tab.force = force return delay_tab end
local function fetch_valid_cache(lru_obj, invalid_stale, item_ttl, item_release, key, version) local obj, stale_obj = lru_obj:get(key) if obj and obj.ver == version then return obj end
-- 如果 TTL 到期的数据版本号仍一致, 重新 set 该缓存 if not invalid_stale and stale_obj and stale_obj.ver == version then lru_obj:set(key, stale_obj, item_ttl) return stale_obj end
-- release 回调 if item_release and obj then item_release(obj.val) end
return nil end
-- 返回创建 LRU 的匿名函数 local function new_lru_fun(opts) local item_count, item_ttl if opts and opts.type == 'plugin' then item_count = opts.count or PLUGIN_ITEMS_COUNT item_ttl = opts.ttl or PLUGIN_TTL else item_count = opts and opts.count or GLOBAL_ITEMS_COUNT item_ttl = opts and opts.ttl or GLOBAL_TTL end
local item_release = opts and opts.release local invalid_stale = opts and opts.invalid_stale -- 是否使用并发锁 local serial_creating = opts and opts.serial_creating -- 参数为 LRU size local lru_obj = lru_new(item_count)
return function (key, version, create_obj_fun, ...) -- 不支持的 yielding 的 Nginx phase 无法使用 resty.lock if not serial_creating or not can_yield_phases[get_phase()] then local cache_obj = fetch_valid_cache(lru_obj, invalid_stale, item_ttl, item_release, key, version) if cache_obj then return cache_obj.val end
local obj, err = create_obj_fun(...) if obj ~= nil then lru_obj:set(key, {val = obj, ver = version}, item_ttl) end
return obj, err end
local cache_obj = fetch_valid_cache(lru_obj, invalid_stale, item_ttl, item_release, key, version) if cache_obj then return cache_obj.val end
-- 当缓存失效时获取锁 -- 创建共享内存 lock local lock, err = resty_lock:new(lock_shdict_name) if not lock then return nil, "failed to create lock: " .. err end
local key_s = tostring(key) log.info("try to lock with key ", key_s)
-- 获取 lock local elapsed, err = lock:lock(key_s) if not elapsed then return nil, "failed to acquire the lock: " .. err end
-- 再次获取缓存 cache_obj = fetch_valid_cache(lru_obj, invalid_stale, item_ttl, nil, key, version) if cache_obj then lock:unlock() log.info("unlock with key ", key_s) return cache_obj.val end
local obj, err = create_obj_fun(...) if obj ~= nil then lru_obj:set(key, {val = obj, ver = version}, item_ttl) end lock:unlock() log.info("unlock with key ", key_s)
OpenResty 特权进程不能处理请求,只能由 Timer 触发,逻辑上编写 if type(ngx.process.type()) == "privileged agent" 只在特权进程中执行操作。
Enables the privileged agent process in Nginx.
The privileged agent process does not listen on any virtual server ports like those worker processes. And it uses the same system account as the nginx master process, which is usually a privileged account like root.
The init_worker_by_lua* directive handler still runs in the privileged agent process. And one can use the type function provided by this module to check if the current process is a privileged agent.(https://cloudnative.to/blog/apisix-source-code-reading/#fn:6)
-- worker 默认后台运行的 timer, 执行各种后台任务 local function background_timer() if core.table.nkeys(timers) == 0 then return end
local threads = {} for name, timer in pairs(timers) do core.log.info("run timer[", name, "]")
-- 开启协程执行 local th, err = thread_spawn(timer) if not th then core.log.error("failed to spawn thread for timer [", name, "]: ", err) goto continue end
core.table.insert(threads, th)
::continue:: end
local ok, err = thread_wait(unpack(threads)) if not ok then core.log.error("failed to wait threads: ", err) end end
function _M.init_worker() local opts = { each_ttl = 0, sleep_succ = 0, check_interval = check_interval, -- 默认间隔为 1 秒 } local timer, err = core.timer.new("background", background_timer, opts) if not timer then core.log.error("failed to create background timer: ", err) return end
core.log.notice("succeed to create background timer") end
APISIX 引入特权进程的一个目的在于实现 Log Rotate 插件功能。
5. 请求生命周期
5.1. ctx
Use ngx.ctx wherever you can. ngx.var is much more expensive and is also limited to string values. The latter should only be used to exchange data with other nginx C modules.
APISIX 中使用缓存 ngx.var 获取的结果, 在不同生命周期中传递。使用 lua-var-nginx-module Nginx C 模块和 FFI 获取变量,在没有开启 Nginx C 模块的情况下回退到 ngx.var 方式获取。APISIX 默认没有在构建脚本中加载 C 模块,提交的 PR feat: add lua-var-nginx-module 在编译 OpenResty 时添加了该模块。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
function _M.set_vars_meta(ctx) -- 从 table 池中获取/创建一个 hash 长度为 32 的 table local var = tablepool.fetch("ctx_var", 0, 32) if not var._cache then var._cache = {} end
-- 通过 resty.core.base 获取原始 request C 指针 (?) -- ref: https://github.com/openresty/lua-resty-core/blob/master/lib/resty/core/base.lua var._request = get_request() -- 绑定元表 setmetatable(var, mt) -- 缓存到 ngx ctx 中 ctx.var = var end
do -- 获取特殊 var 类型的方法 local var_methods = { method = ngx.req.get_method, -- ref: https://github.com/cloudflare/lua-resty-cookie cookie = function () return ck:new() end }
local mt = { -- 重载 hash 元方法 -- t 是 self __index = function(t, key)
-- 若 cache table 存在直接返回 local cached = t._cache[key] if cached ~= nil then return cached end
if type(key) ~= "string" then error("invalid argument, expect string value", 2) end
local val -- 如果是特殊类型, 使用特定方法获取 local method = var_methods[key] if method then val = method()
elseif core_str.has_prefix(key, "cookie_") then -- 通过 var_methods 访问到 resty.cookie local cookie = t.cookie if cookie then local err val, err = cookie:get(sub_str(key, 8)) if not val then log.warn("failed to fetch cookie value by key: ", key, " error: ", err) end end
elseif core_str.has_prefix(key, "graphql_") then -- trim the "graphql_" prefix key = sub_str(key, 9) val = get_parsed_graphql(t)[key]
elseif key == "route_id" then val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.route_id
elseif key == "service_id" then val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.service_id
elseif key == "consumer_name" then val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.consumer_name
elseif key == "route_name" then val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.route_name
elseif key == "service_name" then val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.service_name
elseif key == "balancer_ip" then val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.balancer_ip
elseif key == "balancer_port" then val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.balancer_port
else val = get_var(key, t._request) end
if val ~= nil then t._cache[key] = val end
-- 为空返回 nil return val end,
__newindex = function(t, key, val) if ngx_var_names[key] then ngx_var[key] = val end
-- log.info("key: ", key, " new val: ", val) t._cache[key] = val end, }
部分 APISIX 路由匹配的内部参数在其他阶段注入。
5.2. headers
1 2 3 4 5 6 7 8 9 10 11 12 13
-- 用 ngx.ctx table 缓存 headers, 避免再进行一次 ffi 调用 local function _headers(ctx) if not ctx then ctx = ngx.ctx.api_ctx end local headers = ctx.headers if not headers then headers = get_headers() ctx.headers = headers end
这里的优化点与 Kong 一样,在 init_by_lua 阶段进行数据的 warm up,之后数据会 fork 到其他的进程中。
It does not really make much sense to use this library in the context of init_by_lua because the cache will not get shared by any of the worker processes (unless you just want to “warm up” the cache with predefined items which will get inherited by the workers via fork()).
-- 初始化 etcd function _M.init() local local_conf, err = config_local.local_conf() if not local_conf then return nil, err end
if table.try_read_attr(local_conf, "apisix", "disable_sync_configuration_during_start") then return true end
-- 获取 etcd cli local etcd_cli, err = get_etcd() if not etcd_cli then return nil, "failed to start a etcd instance: " .. err end
local etcd_conf = local_conf.etcd local prefix = etcd_conf.prefix -- 加载 etcd 所有数据到 lua table 中, 单例模式 local res, err = readdir(etcd_cli, prefix, create_formatter(prefix)) if not res then return nil, err end
-- 创建格式化 formatter local function create_formatter(prefix) -- 返回闭包函数, 对 etcd 返回的结果进行格式化 -- 格式个毛, 这就是个 hook 函数 return function (res) res.body.nodes = {}
local dirs if is_http then dirs = constants.HTTP_ETCD_DIRECTORY else dirs = constants.STREAM_ETCD_DIRECTORY end
local curr_dir_data local curr_key for _, item in ipairs(res.body.kvs) do if curr_dir_data then -- 将匹配的内容插入 table if core_str.has_prefix(item.key, curr_key) then table.insert(curr_dir_data, etcd_apisix.kvs_to_node(item)) goto CONTINUE end
curr_dir_data = nil end
-- 截取 prefix 后的 key local key = sub_str(item.key, #prefix + 1) if dirs[key] then -- single item loaded_configuration[key] = { body = etcd_apisix.kvs_to_node(item), headers = res.headers, } else -- 前缀一致 local key = sub_str(item.key, #prefix + 1, #item.key - 1) -- 去掉末尾的 / -- ensure the same key hasn't been handled as single item if dirs[key] and not loaded_configuration[key] then loaded_configuration[key] = { body = { nodes = {}, }, headers = res.headers, } curr_dir_data = loaded_configuration[key].body.nodes curr_key = item.key end end
-- 定时器自动同步 etcd 数据 local function _automatic_fetch(premature, self) if premature then return end
local i = 0 while not exiting() and self.running and i <= 32 do i = i + 1
local ok, err = xpcall(function() if not self.etcd_cli then local etcd_cli, err = get_etcd() if not etcd_cli then error("failed to create etcd instance for key [" .. self.key .. "]: " .. (err or "unknown")) end self.etcd_cli = etcd_cli end
-- 同步数据 local ok, err = sync_data(self) if err then if err ~= "timeout" and err ~= "Key not found" and self.last_err ~= err then log.error("failed to fetch data from etcd: ", err, ", ", tostring(self)) end
if err ~= self.last_err then self.last_err = err self.last_err_time = ngx_time() else if ngx_time() - self.last_err_time >= 30 then self.last_err = nil end end
ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay) elseif not ok then -- no error. reentry the sync with different state ngx_sleep(0.05) end
end, debug.traceback)
if not ok then log.error("failed to fetch data from etcd: ", err, ", ", tostring(self)) ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay) break end end
-- 进行下一次循环 if not exiting() and self.running then ngx_timer_at(0, _automatic_fetch, self) end end
-- etcd 配置创建 function _M.new(key, opts) local local_conf, err = config_local.local_conf() if not local_conf then return nil, err end
-- etcd 重新同步事件 5 秒, 与 Kong 重新 poll db 数据一致 local etcd_conf = local_conf.etcd local prefix = etcd_conf.prefix local resync_delay = etcd_conf.resync_delay if not resync_delay or resync_delay < 0 then resync_delay = 5 end
local automatic = opts and opts.automatic local item_schema = opts and opts.item_schema local filter_fun = opts and opts.filter local timeout = opts and opts.timeout local single_item = opts and opts.single_item local checker = opts and opts.checker
if automatic then -- timer 定时获取数据 if not key then return nil, "missing `key` argument" end
-- 从单例 table 获取 etcd 数据, 进行处理 if loaded_configuration[key] then local res = loaded_configuration[key] -- 清空 table loaded_configuration[key] = nil -- tried to load
log.notice("use loaded configuration ", key)
local dir_res, headers = res.body, res.headers -- 加载数据并校验数据, 过滤数据 load_full_data(obj, dir_res, headers) end
-- attach common methods if the router doesn't provide its custom implementation local function attach_http_router_common_methods(http_router) ...
if http_router.init_worker == nil then http_router.init_worker = function (filter) -- 添加路由 http_router.user_routes = http_route.init_worker(filter) end end end
function _M.http_init_worker() local conf = core.config.local_conf() -- 默认的匹配模式 local router_http_name = "radixtree_uri" local router_ssl_name = "radixtree_sni"
if conf and conf.apisix and conf.apisix.router then router_http_name = conf.apisix.router.http or router_http_name router_ssl_name = conf.apisix.router.ssl or router_ssl_name end
do local uri_routes = {} local uri_router local match_opts = {}
function _M.match(api_ctx) -- 从 module 的 user_routes 属性获取路由, 在 etcd route 变化时回调添加 local user_routes = _M.user_routes if not cached_version or cached_version ~= user_routes.conf_version then uri_router = base_router.create_radixtree_uri_router(user_routes.values, uri_routes, false) cached_version = user_routes.conf_version end
if not uri_router then core.log.error("failed to fetch valid `uri` router: ") return true end
return base_router.match_uri(uri_router, match_opts, api_ctx) end
end
radixtree 路由匹配库提供了匹配成功回调 handler,匹配成功后传递到 ctx 中。
1 2 3 4 5 6 7 8
core.table.insert(uri_routes, { ... handler = function (api_ctx, match_opts) api_ctx.matched_params = nil api_ctx.matched_route = route api_ctx.curr_req_matched = match_opts.matched end })
8. Balancer
Balancer 部分与 Kong 逻辑一致,甚至代码里函数名都一样,主要逻辑是 Service/Upstream 节点解析、负载均衡策略、健康检查与失败重试。
APISIX 支持的一特性是外部服务发现,Kong 中默认支持通过 DNS 解析 Service host,根据 AAAA、A、SRV 记录添加 IP 与优先级,APISIX 支持了从 consul、eruka 和其他注册中心获取 IP 地址列表,并同步节点数据(长轮询)。
function _M.set_by_route(route, api_ctx) ... -- 如果 serivce host 是域名, 通过 discovery 发现, dns 解析 if up_conf.service_name then ... -- 外部注册中心 local dis = discovery[up_conf.discovery_type] if not dis then return 500, "discovery " .. up_conf.discovery_type .. " is uninitialized" end
-- 从注册中心数据源(缓存本地 table)获取 IP local new_nodes, err = dis.nodes(up_conf.service_name) if not new_nodes then return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node: " .. (err or "nil") end
local nodes_count = up_conf.nodes and #up_conf.nodes or 0 if nodes_count == 0 then return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node" end ...
set_upstream_scheme(api_ctx, up_conf)
local ok, err = fill_node_info(up_conf, api_ctx.upstream_scheme, false) if not ok then return 503, err end ...
local scheme = up_conf.scheme if (scheme == "https" or scheme == "grpcs") and up_conf.tls then ... end
-- set_balancer_opts will be called in balancer phase and before any tries local function set_balancer_opts(route, ctx) local up_conf = ctx.upstream_conf
-- If the matched route has timeout config, prefer to use the route config. local timeout = nil if route and route.value and route.value.timeout then timeout = route.value.timeout else if up_conf.timeout then timeout = up_conf.timeout end end -- 设置 Nginx 请求超时时间 if timeout then local ok, err = set_timeouts(timeout.connect, timeout.send, timeout.read) if not ok then core.log.error("could not set upstream timeouts: ", err) end end
local retries = up_conf.retries if not retries or retries < 0 then retries = #up_conf.nodes - 1 end
-- 设置 Nginx 失败重试次数 if retries > 0 then local ok, err = set_more_tries(retries) ... end end
function init_plugins_syncer() local err -- 储存插件的配置信息, 一条 kv plugins_conf, err = core.config.new("/plugins", { automatic = true, -- 后台创建 timer watch etcd 自动同步配置 item_schema = core.schema.plugins, single_item = true, -- filter 方法中访问到 etcd kv 的 item, 这里进行插件加载的回调 -- 每次 etcd 插件配置变动, 自动同步 filter = function(item) -- we need to pass 'item' instead of plugins_conf because -- the latter one is nil at the first run _M.load(item) end, }) if not plugins_conf then error("failed to create etcd instance for fetching /plugins : " .. err) end end end
-- 加载插件 local function load(plugin_names) local processed = {} for _, name in ipairs(plugin_names) do if processed[name] == nil then processed[name] = true end end
-- 插件配置绑定 function _M.filter(user_route, plugins) ...
plugins = plugins or core.tablepool.fetch("plugins", 32, 0) for _, plugin_obj in ipairs(local_plugins) do local name = plugin_obj.name local plugin_conf = user_plugin_conf[name]
-- 插件和插件配置存入 if type(plugin_conf) == "table" and not plugin_conf.disable then core.table.insert(plugins, plugin_obj) core.table.insert(plugins, plugin_conf) end end
if changed then core.table.clear(api_ctx.plugins) api_ctx.plugins = plugin.filter(route, api_ctx.plugins) end end -- 执行 access 阶段 plugin.run_plugin("access", plugins, api_ctx)
core.resolver.init_resolver(args) -- 生成节点 ID core.id.init()
-- 启用 openresty 的特权进程 local process = require("ngx.process") local ok, err = process.enable_privileged_agent() if not ok then core.log.error("failed to enable privileged_agent: ", err) end
-- 从 etcd / yaml 本地配置文件获取配置, etcd 有 init 函数 if core.config.init then local ok, err = core.config.init() if not ok then core.log.error("failed to load the configuration: ", err) end end end
function _M.http_init_worker() local seed, err = core.utils.get_seed_from_urandom() if not seed then core.log.warn('failed to get seed from urandom: ', err) seed = ngx_now() * 1000 + ngx.worker.pid() end math.randomseed(seed) -- for testing only core.log.info("random test in [1, 10000]: ", math.random(1, 10000))
-- 进程间事件通信 local we = require("resty.worker.events") local ok, err = we.configure({shm = "worker-events", interval = 0.1}) if not ok then error("failed to init worker event: " .. err) end -- 服务发现 lib local discovery = require("apisix.discovery.init").discovery -- 默认没有开启服务发现 if discovery and discovery.init_worker then discovery.init_worker() end -- 初始化负载均衡器, 方法为空 require("apisix.balancer").init_worker() -- 负载均衡器 load_balancer = require("apisix.balancer") -- TODO admin 流程分析 require("apisix.admin.init").init_worker() -- 注册全局 timer require("apisix.timers").init_worker()
-- access_by_lua 阶段, apisix 没有 rewrite_by_lua -- ref: https://github.com/apache/apisix/issues/1120 -- ref: https://github.com/apache/apisix/issues/1120#issuecomment-584949073 function _M.http_access_phase() local ngx_ctx = ngx.ctx ...
-- 从 table 缓存池中获取 table -- always fetch table from the table pool, we don't need a reused api_ctx local api_ctx = core.tablepool.fetch("api_ctx", 0, 32) -- 将 table 储存在 ngx.ctx 中, 下一个阶段共享 ngx_ctx.api_ctx = api_ctx
-- run global rule plugin.run_global_rules(api_ctx, router.global_rules, nil)
... local enable_websocket = route.value.enable_websocket
-- route 插件配置绑定 if route.value.plugin_config_id then ... route = plugin_config.merge(route, conf) end
-- 获取对应的 service if route.value.service_id then local service = service_fetch(route.value.service_id) ... if enable_websocket == nil then enable_websocket = service.value.enable_websocket end
else ... end api_ctx.route_id = route.value.id api_ctx.route_name = route.value.name
-- 执行 script if route.value.script then script.load(route, api_ctx) script.run("access", api_ctx) else -- 插件过滤, 遍历插件列表, 匹配开启的插件, O(n) local plugins = plugin.filter(route) api_ctx.plugins = plugins
-- fake 执行 rewrite 阶段 plugin.run_plugin("rewrite", plugins, api_ctx) if api_ctx.consumer then local changed route, changed = plugin.merge_consumer_route( route, api_ctx.consumer, api_ctx )
if changed then core.table.clear(api_ctx.plugins) api_ctx.plugins = plugin.filter(route, api_ctx.plugins) end end -- 执行 access 阶段 plugin.run_plugin("access", plugins, api_ctx) end
local up_id = route.value.upstream_id
-- used for the traffic-split plugin if api_ctx.upstream_id then up_id = api_ctx.upstream_id end ...
-- websocket 特殊处理 if enable_websocket then api_ctx.var.upstream_upgrade = api_ctx.var.http_upgrade api_ctx.var.upstream_connection = api_ctx.var.http_connection core.log.info("enabled websocket for route: ", route.value.id) end
if route.value.service_protocol == "grpc" then api_ctx.upstream_scheme = "grpc" end
-- 获取 upstream 节点 local code, err = set_upstream(route, api_ctx) if code then core.log.error("failed to set upstream: ", err) core.response.exit(code) end
-- 负载均衡 local server, err = load_balancer.pick_server(route, api_ctx) if not server then core.log.error("failed to pick server: ", err) return core.response.exit(502) end
api_ctx.picked_server = server
set_upstream_headers(api_ctx, server)
-- stash ngx ctx 这部分与 Kong 一致, 怀疑是抄来的(95% 置信区间) ngx_var.ctx_ref = ctxdump.stash_ngx_ctx() local up_scheme = api_ctx.upstream_scheme if up_scheme == "grpcs" or up_scheme == "grpc" then return ngx.exec("@grpc_pass") end
if api_ctx.dubbo_proxy_enabled then return ngx.exec("@dubbo_pass") end end