问答 2018-12-10 来自:开发者社区

Apache flink 1.52 Rowtime时间戳为空

我正在使用以下代码进行一些查询: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream ds = SourceHelp.builder().env(env).consumer010(MyKafka.builder().build().kafkaWithWaterMark2()) .row...

问答 2018-12-10 来自:开发者社区

Apache Flink:map vs flatMap

在flink中,flatMap也可以发出一条记录。似乎flatMap可以取代map。有人能告诉我这种情况的不同吗?谢谢。

问答 2018-12-10 来自:开发者社区

窗口操作员行为澄清后的Apache Flink KeyedStream

我要求澄清Apache Flink(1.6.0)在通过窗口发送事件后如何处理来自KeyedStreams的事件,并且已经应用​​了某些运算符(例如reduce()或process())。 假设单个节点集群在执行了键控窗口流上的运算符之后,剩下的只有1个DataStream或者恰好是k个 DataStream(其中k是键的唯一值的数量)? 为了澄清,考虑需要从某些来源读取事件,按键k读取密钥,将键....

问答 2018-12-10 来自:开发者社区

处理时间窗口不适用于Apache Flink中的有限数据源

我正在尝试将一个非常简单的窗口函数应用于Apache Flink中的有限数据流(本地,没有集群)。这是一个例子: val env = StreamExecutionEnvironment.getExecutionEnvironmentenv .fromCollection(List("a", "b", "c", "d", "e")) .windowAll(TumblingProcessingT.....

问答 2018-12-10 来自:开发者社区

工作节点 - 与Apache Flink的文件系统关联

我有一个特定的监控系统,每个受监控的服务器上都有数千个本地保存的文件(没有HDFS)。我想用flink来查询这些文件。如果我在每台机器上创建一个工作节点并且它们查询特定文件,主节点将如何知道将此任务发送到相关文件所在的节点?我推荐的一个方法是最小化网络流量并避免在节点之间移动数据。有没有办法以某种方式“暗示”它?

问答 2018-12-06 来自:开发者社区

Apache Flink:不会触发流加入窗口

我正在尝试加入apache flink中的两个流来获得一些结果。 我的项目的当前状态是,我正在获取Twitter数据并将其映射到2元组,其中保存用户的语言和定义的时间窗口中的推文总和。我根据每种语言的推文数量和每种语言的转发数量来做这些。tweet / retweet聚合在其他进程中工作正常。 我现在想要在一个时间窗口中获得转推数量的百分比到所有推文的数量。 因此我使用以下代码: Time wi....

问答 2018-12-06 来自:开发者社区

Apache Flink Kubernetes Job Arguments

我正在尝试使用Kubernetes设置一个集群(Apache Flink 1.6.1),并在我运行作业时遇到以下错误: 2018-10-09 14:29:43.212 [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint - ---------------------------------------------...

问答 2018-12-06 来自:开发者社区

Apache Flink:Python流API中的Kafka连接器,“无法加载用户类”

我正在尝试使用Flink的新Python流API并尝试运行我的脚本./flink-1.6.1/bin/pyflink-stream.sh examples/read_from_kafka.py。python脚本相当简单,我只是尝试使用现有主题并将所有内容发送到stdout(或默认情况下输出方法发出数据的日志目录中的* .out文件)。 import globimport osimport sys....

问答 2018-12-06 来自:开发者社区

Apache Flink - 每小时聚合数据的每日汇总

我有一个窗口小时聚合的DataStream。 DataStream ds = ..... SingleOutputStreamOperator hourly = ds.keyBy(HourlyCountersAggregation.KEY_SELECTOR) .timeWindow(Time.hours(1)) .aggregate(new Hourly...

问答 2018-12-05 来自:开发者社区

在Apache Flink中手动更新状态的最佳方法是什么?

我在股票市场项目中使用Apache Flink来计算当前的价格变化。公式是 price_change = (current_price - previous_close_price) / previous_close_priceprevious_close_price是交易所前一天的证券收盘价。在市场开放前的每一天,我都需要更新previous_close_price。现在我想出了几个解决方案,....

本页面内关键词为智能算法引擎基于机器学习所生成,如有任何问题,可在页面下方点击"联系我们"与我们沟通。

产品推荐

Apache Spark 中国技术社区

阿里巴巴开源大数据技术团队成立 Apache Spark 中国技术社区,定期推送精彩案例,问答区数个 Spark 技术同学每日在线答疑,只为营造 Spark 技术交流氛围,欢迎加入!

+关注
相关镜像