从上一篇可以知道, fe将stream load的请求转发给be了, 并讲了fe的针对stream load的处理过程. 下面我们将分析be是如何处理stream load的.
如果你比较熟悉be的代码结构, 可以轻易找到在http_service.cpp中注册了处理stream load请求的处理类. http_service.cpp中注册stream load的核心代码如下: 通古这段代码我们知道: StreamLoadAction.cpp这个类主要负责处理stream load.
在StreamLoadAction.cpp泪中我们看到有很多方法, 我们看到on_header()这个方法时, 可以猜想这个方法应该是处理http header的方法, 我们打开看看, 代码如下: .
通过这个方法我们可以看到有预先设置的很多http header的key名字, 比如: HTTP_DB_KEY, HTTP_TABLE_KEY等. 是否有可能超时时间的key也是通过这个设置的呢? 我们找到这些定义这些key的地方: 查看全部key的定义我们终于找到了可能的那个超时http header: HTTP_TIMEOUT = "timeout“, 接下来我们看看这个变量在什么地方被使用了, 我们得到了如下代码:
这里是重点, 我们看到3个红框, 我分别讲解下
重点在于红框2中, begin_txn()方法是如何调用fe的, 发送了什么请求参数给fe?
通过上一步的讲解, 我们查看 begin_txn() 方法的具体实现, 核心代码如下:
通古查看代码, 我们看到:
#ifndef BE_TEST
这个代码是一个宏定义, 表示当不是be测试时代码将生效, 这段代码中核心变量我讲解下:
这里我们知道了be调用fe的RPC接口, 将超时参数传递给fe了, fe怎么处理的呢?
如果不知道fe中是如何实现RPC接口的, 我们可以全局搜索下loadTxnBegin()方法的定义: 当然, 这里thrift代码我是事先生成的, 不然RPC部分的代码会找不到. 如果你没有生成, 就直接全局搜索这个方法名, 看看什么地方做了具体实现. 如果你幸运的话, 你会找到如下:
找到这里, 我们猜测下FrontendServiceImpl.java应该是Fe的RPC服务的实现类.
我们跟随这个截图中 loadTxnBeginImpl( )方法, 看看这个方法的核心实现: 这里我们看到了, 单独设置了超时时间, 如果超时时间没有设置, 则使用默认的: Config.stream_load_default_timeout_second,
看到这里我们看看TransactionState构造函数, 会将超时时间赋予timeoutMs这个变量, 我们查看这个变量的引用可以得到以下代码: 查看红框中引用了超时变量的方法代码如下:
// return true if txn is running but timeout
public boolean isTimeout(long currentMillis) {
return transactionStatus == TransactionStatus.PREPARE && currentMillis - prepareTime > timeoutMs;
}
通过查看注释和代码逻辑, 我们可以得知这个应该是我们需要找的超时判断的方法. 我们查看这个方法的调用方法, 可以依次得到: transactionState.isTimeout() [Doris Stream Load 超时问题排查1] - stream load源码排查定位- 504 Gateway Time-out
这里超时问题也就查到根本原因了, 用户用户发送stream load请求时, 可以通过设置http header的方法(key=timeout)修改stream load导入超时时间. 如果不设置则等于: Config.stream_load_default_timeout_second, 默认300秒.
欢迎添加微信,互相学习↑↑↑ -_-
白老虎
programming is not only to solve problems, ways to think
grafana 级连 菜单 templating (variables) 配置
AI 机器人 抓取 微信 聊天中的 百度网盘 分享地址和密码
rocketmq 集群搭建 (2master + 2slave + 2namesrv)