ZVVQ代理分享网

案例(一)-KisFlow-Golang流实时计算-快速入门指南

作者:zvvq博客网
导读Github:https:// git hub.com/aceld/kis-flow 文档:https://github.com/aceld/kis-flow/wiki 第 1 部分-概览 Part2.1-项目构建/基础模块 Part2.2-项目构建/基础模块 第三部分-数据流 Part4-功能调度 第5部分-连接器

zvvq.cn

Github:https://github.com/aceld/kis-flow zvvq好,好zvvq

文档:https://github.com/aceld/kis-flow/wiki

第 1 部分-概览 内容来自samhan

Part2.1-项目构建/基础模块

copyright zvvq

Part2.2-项目构建/基础模块 内容来自samhan666

第三部分-数据流 内容来自samhan

Part4-功能调度 内容来自zvvq

第5部分-连接器 zvvq好,好zvvq

Part6-配置导入导出

zvvq好,好zvvq

Part7-KisFlow 动作 copyright zvvq

Part8-Cache/Params 数据缓存和数据参数 zvvq.cn

Part9-流程的多份副本 内容来自samhan666

Part10-Prometheus Metrics 统计 内容来自zvvq,别采集哟

Part11-基于反射的FaaS参数类型自适应注册

案例 1-快速入门 内容来自samhan666

Case2-流程并行操作

内容来自samhan666

Case3-KisFlow在多Goroutine中的应用

下载 KisFlow 源代码

$前往 github.com/aceld/kis-flow

内容来自zvvq

 

KisFlow 开发者文档

zvvq

KisFlow 快速入门(使用配置文件) 源代码示例: kis-flow-usage/2-quick_start_with_config 位于 main · acld/kis-flow-usage

首先,让我们创建一个具有以下文件结构的项目:

内容来自zvvq

项目目录

├── Makefile

本文来自zvvq

├── 会议 内容来自zvvq

│ ├── flow-CalStuAvgScore.yml

内容来自zvvq

│ ├── func-AvgStuScore.yml

内容来自samhan

│ └── func-PrintStuAvgScore.yml zvvq.cn

├── faas_stu_score_avg.go

本文来自zvvq

├── faas_stu_score_avg_print.go 内容来自samhan

└── main.go 内容来自zvvq,别采集哟

 

流动

定义当前Flow。目前的Flow名为“CalStuAvgScore”,是一个计算学生平均成绩的数据流。

内容来自zvvq

定义两个函数。 Function1是Calculate,是计算学生平均成绩的逻辑,Function2是Expand,是打印最终结果 本文来自zvvq

配置

Flow 和 Functions 的配置文件如下:

内容来自samhan

(1) 流程配置 conf/flow-CalStuAvgScore.yml

kistype:流动

本文来自zvvq

状态:1 内容来自samhan

flow_name:CalStuAvgScore 内容来自samhan

流量: copyright zvvq

- 文件名称:AvgStuScore 内容来自samhan

- 文件名称:PrintStuAvgScore

内容来自samhan

 
(2) 功能1配置

conf/func-AvgStuScore.yml 内容来自zvvq,别采集哟

kistype:功能 内容来自samhan666

文件名称:AvgStuScore 内容来自zvvq,别采集哟

fmode:计算 内容来自zvvq,别采集哟

来源: zvvq好,好zvvq

名称: 学生成绩

内容来自samhan666

必须:

zvvq.cn

- 学生ID copyright zvvq

 
(3)功能2配置

conf/func-PrintStuAvgScore.yml zvvq

kistype:功能 内容来自zvvq,别采集哟

文件名称:PrintStuAvgScore

copyright zvvq

fmode:展开

本文来自zvvq

来源:

本文来自zvvq

名称: 学生成绩 内容来自zvvq,别采集哟

必须: 内容来自zvvq,别采集哟

- 学生ID

copyright zvvq

 

主要的

接下来是主要逻辑,分为三步: copyright zvvq

加载配置文件并获取Flow实例。 提交数据。 运行流程。

main.go

zvvq.cn

包主 内容来自zvvq

进口 ( 本文来自zvvq

“语境” 内容来自samhan

“FMMT” zvvq.cn

“github.com/aceld/kis-flow/file”

本文来自zvvq

“github.com/aceld/kis-flow/kis”

copyright zvvq

内容来自zvvq,别采集哟

函数主() {

本文来自zvvq

ctx := context.Background() zvvq

// 从文件加载配置 内容来自zvvq,别采集哟

if err := file.ConfigImportYaml("conf/");错误!=零{ zvvq.cn

恐慌(错误)

copyright zvvq

} 内容来自samhan

// 获取流量 内容来自zvvq,别采集哟

flow1 := kis.Pool().GetFlow("CalStuAvgScore")

内容来自samhan

如果流 1 == nil {

本文来自zvvq

恐慌(“流1为零”)

zvvq.cn

}

zvvq

// 提交字符串

内容来自samhan666

_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`) 内容来自samhan

// 提交字符串

内容来自samhan

_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`)

内容来自zvvq,别采集哟

// 运行流程 zvvq好,好zvvq

if err := flow1.Run(ctx);错误!=零{

内容来自zvvq

fmt.Println("错误:",错误) copyright zvvq

}

内容来自zvvq,别采集哟

返回

zvvq好,好zvvq

} zvvq

 

功能1

第一个计算过程的实现逻辑如下。 AvgStuScoreIn 是输入数据类型,目前包含三个分数,AvgStuScoreOut 是输出数据类型,是平均分数。 zvvq

faas_stu_score_avg.go

zvvq好,好zvvq

包主 zvvq.cn

进口 ( copyright zvvq

“语境”

内容来自samhan666

“github.com/aceld/kis-flow/kis” zvvq好,好zvvq

“github.com/aceld/kis-flow/serialize” zvvq好,好zvvq

zvvq好,好zvvq

类型 AvgStuScoreIn 结构体 {

内容来自zvvq

序列化.DefaultSerialize

zvvq

StuId int `json:"stu_id"`

zvvq好,好zvvq

Score1 int `json:"score_1"` copyright zvvq

Score2 int `json:"score_2"`

内容来自samhan666

Score3 int `json:"score_3"`

内容来自samhan666

}

zvvq

类型 AvgStuScoreOut 结构体 {

内容来自zvvq

序列化.DefaultSerialize

zvvq.cn

StuId int `json:"stu_id"`

zvvq

AvgScore float64 `json:"avg_score"`

zvvq

} 本文来自zvvq

// AvgStuScore(FaaS) 计算学生的平均成绩

zvvq.cn

func AvgStuScore(ctx context.Context, flow kis.Flow, rows []AvgStuScoreIn) error { copyright zvvq

对于 _, row := 范围行 { 内容来自samhan

输出 := AvgStuScoreOut{

zvvq好,好zvvq

StuId:行.StuId,

内容来自samhan666

平均得分:float64(行.得分1+行.得分2+行.得分3) / 3, copyright zvvq

} copyright zvvq

// 提交结果数据 zvvq.cn

_ = flow.CommitRow(out) 内容来自samhan

} zvvq

返回零

zvvq好,好zvvq

} zvvq

 

功能2

打印的逻辑是直接打印数据,如下

zvvq好,好zvvq

faas_stu_score_avg_print.go

内容来自zvvq,别采集哟

1 内容来自samhan

2

zvvq.cn

3 内容来自zvvq,别采集哟

4 内容来自samhan

5 zvvq

6 内容来自samhan666

7

内容来自samhan666

8 内容来自zvvq,别采集哟

9 copyright zvvq

10

zvvq

11

zvvq好,好zvvq

12

本文来自zvvq

13

zvvq

14 zvvq.cn

15 内容来自zvvq

16 内容来自samhan

17 内容来自samhan666

18

zvvq好,好zvvq

19

本文来自zvvq

20

内容来自samhan666

21

zvvq好,好zvvq

22 copyright zvvq

23 zvvq

24 zvvq

25

zvvq

包主

内容来自zvvq,别采集哟

进口 (

zvvq好,好zvvq

“语境”

copyright zvvq

“FMMT”

zvvq

“github.com/aceld/kis-flow/kis” 本文来自zvvq

“github.com/aceld/kis-flow/serialize”

zvvq

本文来自zvvq

类型 PrintStuAvgScoreIn 结构体 {

zvvq.cn

序列化.DefaultSerialize

copyright zvvq

StuId int `json:"stu_id"`

zvvq

AvgScore float64 `json:"avg_score"` 内容来自zvvq

}

内容来自samhan666

类型 PrintStuAvgScoreOut 结构体 { 内容来自zvvq,别采集哟

序列化.DefaultSerialize

内容来自samhan666

} zvvq

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []PrintStuAvgScoreIn) error { 内容来自samhan666

对于 _, row := 范围行 {

内容来自samhan666

fmt.Printf("stuid: [%+v], 平均成绩: [%+v]n", row.StuId, row.AvgScore)

本文来自zvvq

} 内容来自zvvq

返回零 zvvq.cn

}

copyright zvvq

 

输出

最后运行程序,得到如下结果: copyright zvvq

1 zvvq.cn

2

内容来自samhan

3

本文来自zvvq

4 本文来自zvvq

5

zvvq.cn

添加 KisPool FuncName=AvgStuScore zvvq

添加 KisPool FuncName=PrintStuAvgScore zvvq

添加 FlowRouter FlowName=CalStuAvgScore zvvq

学生:[101],平均分数:[90] 内容来自zvvq,别采集哟

学习:[102],平均分数:[76.66666666666667] copyright zvvq

 

2.KisFlow快速入门(使用原生接口,动态配置)

源代码示例:kis-flow-usage/1-quick_start at main · acld/kis-flow-usage

copyright zvvq

项目目录

1 本文来自zvvq

2 内容来自zvvq

3 内容来自zvvq

├── faas_stu_score_avg.go

内容来自zvvq

├── faas_stu_score_avg_print.go

内容来自zvvq

└── main.go

zvvq.cn

 

流动

主要的

main.go 内容来自samhan

1 内容来自zvvq

2

zvvq好,好zvvq

3 zvvq好,好zvvq

4

内容来自zvvq,别采集哟

5

copyright zvvq

6

zvvq

7 本文来自zvvq

8

本文来自zvvq

9 copyright zvvq

10 内容来自samhan666

11

内容来自zvvq,别采集哟

12

内容来自samhan

13 zvvq.cn

14

内容来自zvvq,别采集哟

15

zvvq好,好zvvq

16

zvvq好,好zvvq

17

zvvq

18 zvvq

19 内容来自samhan666

20

内容来自samhan

21 zvvq.cn

22 zvvq

23

本文来自zvvq

24 本文来自zvvq

25 内容来自samhan

26

zvvq

27 zvvq.cn

28

内容来自zvvq,别采集哟

29

内容来自samhan666

30

zvvq

31

本文来自zvvq

32 copyright zvvq

33 zvvq.cn

34 本文来自zvvq

35

内容来自samhan

36 本文来自zvvq

37 zvvq好,好zvvq

38 内容来自zvvq,别采集哟

39

内容来自samhan

40 zvvq

41 本文来自zvvq

42 内容来自samhan666

43

zvvq

44

zvvq.cn

45

内容来自samhan

46 内容来自samhan666

包主

本文来自zvvq

进口 (

内容来自zvvq

“语境”

zvvq.cn

“FMMT”

zvvq好,好zvvq

“github.com/aceld/kis-flow/common” copyright zvvq

“github.com/aceld/kis-flow/config” copyright zvvq

“github.com/aceld/kis-flow/flow” 内容来自zvvq,别采集哟

“github.com/aceld/kis-flow/kis”

内容来自zvvq

copyright zvvq

函数主() { zvvq.cn

ctx := context.Background()

内容来自samhan666

// 创建新的流配置 zvvq好,好zvvq

myFlowConfig1 := config.NewFlowConfig("CalStuAvgScore", common.FlowEnable) zvvq.cn

// 创建新的函数配置

zvvq好,好zvvq

avgStuScoreConfig := config.NewFuncConfig("AvgStuScore", common.C, nil, nil) copyright zvvq

printStuScoreConfig := config.NewFuncConfig("PrintStuAvgScore", common.E, nil, nil)

zvvq.cn

// 创建一个新流 内容来自samhan

flow1 := flow.NewKisFlow(myFlowConfig1)

内容来自samhan666

// 将函数链接到流程

内容来自samhan

_ = flow1.Link(avgStuScoreConfig, nil)

zvvq.cn

_ = flow1.Link(printStuScoreConfig, nil)

内容来自zvvq,别采集哟

// 提交字符串

zvvq

_ = flow1.CommitRow(`{"stu_id":101, "score_1":100, "score_2":90, "score_3":80}`) zvvq

// 提交字符串

zvvq

_ = flow1.CommitRow(`{"stu_id":102, "score_1":100, "score_2":70, "score_3":60}`) zvvq

// 运行流程

zvvq.cn

if err := flow1.Run(ctx);错误!=零{ copyright zvvq

fmt.Println("错误:",错误) 内容来自samhan666

}

本文来自zvvq

返回

本文来自zvvq

}

本文来自zvvq

函数初始化{ 内容来自zvvq,别采集哟

// 注册函数

内容来自zvvq,别采集哟

kis.Pool().FaaS("AvgStuScore", AvgStuScore) 内容来自zvvq

kis.Pool().FaaS("PrintStuAvgScore", PrintStuAvgScore) 内容来自samhan

} 内容来自samhan666

 

功能1

faas_stu_score_avg.go

zvvq

1 zvvq好,好zvvq

2

zvvq.cn

3 zvvq

4

zvvq

5

内容来自zvvq,别采集哟

6 内容来自samhan

7

内容来自samhan666

8 内容来自zvvq,别采集哟

9 zvvq.cn

10 zvvq好,好zvvq

11

内容来自samhan

12

copyright zvvq

13

内容来自samhan

14 内容来自samhan666

15

zvvq好,好zvvq

16 本文来自zvvq

17 内容来自zvvq

18

内容来自zvvq

19

zvvq

20 内容来自zvvq,别采集哟

21

copyright zvvq

22 内容来自zvvq,别采集哟

23 内容来自samhan

24

zvvq

25 内容来自samhan

26

本文来自zvvq

27

本文来自zvvq

28 内容来自zvvq,别采集哟

29

内容来自zvvq

30

内容来自samhan666

31

zvvq

32 zvvq

33

内容来自zvvq,别采集哟

34

copyright zvvq

35

内容来自samhan

36

内容来自samhan666

37 本文来自zvvq

包主

内容来自samhan

进口 (

zvvq

“语境”

copyright zvvq

“github.com/aceld/kis-flow/kis”

zvvq

“github.com/aceld/kis-flow/serialize” 内容来自zvvq

内容来自zvvq

类型 AvgStuScoreIn 结构体 { 本文来自zvvq

序列化.DefaultSerialize zvvq好,好zvvq

StuId int `json:"stu_id"` zvvq好,好zvvq

Score1 int `json:"score_1"`

内容来自samhan

Score2 int `json:"score_2"` 内容来自samhan666

Score3 int `json:"score_3"` zvvq

}

copyright zvvq

类型 AvgStuScoreOut 结构体 {

内容来自zvvq

序列化.DefaultSerialize 本文来自zvvq

StuId int `json:"stu_id"`

zvvq

AvgScore float64 `json:"avg_score"` 本文来自zvvq

}

zvvq好,好zvvq

// AvgStuScore(FaaS) 计算学生的平均成绩 zvvq.cn

func AvgStuScore(ctx context.Context, flow kis.Flow, rows []AvgStuScoreIn) error { zvvq.cn

对于 _, row := 范围行 {

本文来自zvvq

输出 := AvgStuScoreOut{

内容来自samhan

StuId:行.StuId, 内容来自samhan666

平均得分:float64(行.得分1+行.得分2+行.得分3) / 3, 本文来自zvvq

}

内容来自samhan666

// 提交结果数据

内容来自zvvq,别采集哟

_ = flow.CommitRow(out) 内容来自zvvq

} 内容来自samhan666

返回零 本文来自zvvq

} 内容来自zvvq

 

功能2

faas_stu_score_avg_print.go

内容来自zvvq,别采集哟

1

zvvq好,好zvvq

2

内容来自zvvq,别采集哟

3

内容来自samhan

4 zvvq.cn

5 zvvq好,好zvvq

6

copyright zvvq

7 内容来自zvvq,别采集哟

8 内容来自zvvq,别采集哟

9 zvvq.cn

10

内容来自samhan666

11

内容来自samhan

12 内容来自zvvq,别采集哟

13

copyright zvvq

14

zvvq好,好zvvq

15 本文来自zvvq

16

zvvq好,好zvvq

17 内容来自samhan

18 内容来自samhan666

19

内容来自zvvq

20

内容来自samhan

21

本文来自zvvq

22

zvvq

23 zvvq.cn

24

内容来自samhan666

25

内容来自samhan666

26

内容来自zvvq

包主

zvvq好,好zvvq

进口 (

copyright zvvq

“语境” 内容来自samhan

“FMMT”

copyright zvvq

“github.com/aceld/kis-flow/kis” copyright zvvq

“github.com/aceld/kis-flow/serialize” zvvq好,好zvvq

内容来自samhan

类型 PrintStuAvgScoreIn 结构 {

zvvq

序列化.DefaultSerialize

zvvq.cn

StuId int `json:"stu_id"`

内容来自samhan666

AvgScore float64 `json:"avg_score"` zvvq

} 内容来自samhan

类型 PrintStuAvgScoreOut 结构体 {

zvvq

序列化.DefaultSerialize

zvvq.cn

} 内容来自samhan

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []PrintStuAvgScoreIn) error {

本文来自zvvq

对于 _, row := 范围行 {

内容来自samhan

fmt.Printf("stuid: [%+v], 平均成绩: [%+v]n", row.StuId, row.AvgScore) 内容来自zvvq

} zvvq

返回零

本文来自zvvq

} 内容来自samhan666

 

输出

1

zvvq

2 zvvq好,好zvvq

3 zvvq好,好zvvq

4 copyright zvvq

5

内容来自samhan

6 内容来自zvvq,别采集哟

添加 KisPool FuncName=AvgStuScore

本文来自zvvq

添加 KisPool FuncName=PrintStuAvgScore

zvvq好,好zvvq

funcName NewConfig 源为零,funcName = AvgStuScore,使用默认的未命名源。 本文来自zvvq

funcName NewConfig 源为零,funcName = PrintStuAvgScore,使用默认的未命名源。 内容来自samhan666

学生:[101],平均分数:[90]

本文来自zvvq

学习:[102],平均分数:[76.66666666666667]

内容来自samhan666

 

作者:Aceld

zvvq好,好zvvq

GitHub:https://github.com/aceld

KisFlow 开源项目地址:https://github.com/aceld/kis-flow zvvq

文档:https://github.com/aceld/kis-flow/wiki zvvq.cn

第 1 部分-概览

copyright zvvq

Part2.1-项目构建/基础模块 内容来自zvvq,别采集哟

Part2.2-项目构建/基础模块 内容来自samhan

第三部分-数据流 copyright zvvq

Part4-功能调度 内容来自zvvq

第5部分-连接器

内容来自samhan

Part6-配置导入导出

内容来自samhan666

Part7-KisFlow 动作

本文来自zvvq

Part8-Cache/Params 数据缓存和数据参数 copyright zvvq

Part9-流程的多份副本 copyright zvvq

Part10-Prometheus Metrics 统计 内容来自zvvq,别采集哟

Part11-基于反射的FaaS参数类型自适应注册

案例 1-快速入门 zvvq

Case2-Flow并行运行 内容来自zvvq,别采集哟

Case3-KisFlow在多Goroutine中的应用

以上就是案例 (一)-KisFlow-Golang流实时计算-快速入门指南的详细内容,更多请关注其它相关文章! 内容来自zvvq,别采集哟