研究微服务治理,苦于找不到相对于php合适的rpc框架。多个服务起来,用curl或者guzzle相互域名请求简直太逆天奇葩了。由于服务大部分都是php的,没有常驻后台的daemon开发送心跳。因此对于rpc的选型也是个很严肃的问题。
看过yar、dubbo等协议,但是不是特别理想。yar本身是php的扩展。但是不支持服务治理,或者说只是一个远程过程调用的工具。dubbo主力支持java,其他语言虽然支持,但本质对于php来说和curl没大区别。今儿实战演示一下gRpc的几种调用通讯模式(普通、客户端流、服务端流、双向流)。以及和PHP客户端的联通调用。
1. 需求分析
我们这次只搞个很简单的需求,搞个用户server系统,提供2个接口给外部,1个是保存用户信息,1个是根据用户UID查询用户信息,就这2个,不搞复杂了。
简单的用代码描述下就是:
1 2
| function1 saveUser(name, age) return (UID) function1 getUserInfo(UID) return (UID, name, age)
|
ok,需求分析结束。
2. 编写ptorobuf文件
需求分析结束之后,我们明确了我们需要干什么了,我们需要对外提供2个grp接口,那么我们就开始编写protobuf吧:
先定义2个rpc服务函数
1 2 3 4
| service UserServer { rpc SaveUser (UserParams) returns (Id) {} rpc GetUserInfo (Id) returns (UserInfo) {} }
|
定义好了rpc之后,我们知道,函数里面的参数和返回值得都得是message信息,OK,我们就开始创建参数和返回值对应的3个message吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| //用户ID message Id { int32 id = 1 }
//名字 message Name { string name = 1 }
//年龄 message Age { int32 age = 1 }
// 用户信息 message UserInfo { int32 id = 1 string name = 2 int32 age = 3 }
// 用户参数 message UserParams { Name name = 1 Age age = 2 }
|
上面的写法是,将age和name单独出来搞成2个message,然后在UserParams里面搞成嵌套message,目的是想看下嵌套message在不同语言中的用法。但是总体也是比较简单的。
完整的ptoro文件为:
1 2 3 4 5 6 7 8 9
| //userServer.proto
syntax = "proto3" package proto option go_package = ".;proto"
message ...
service ...
|
3. 编译成go版本的服务端和php版本的客户端文件
编写完成protobuf文件了,接下来就是编译成不同的语言了,我们将使用protoc
命令,来编译生成不同语言的版本库。我们用go语言作为服务端语言,用php和go分别作为客户端语言,完成本次的调用。
如果你机器上还未安装protobuf工具,安装很简单,如果是Mac的话,一条命令就搞定了:
brew install protobuf
如果是其他系统,请参考官方文档,也很简单。
安装好的服务,命令执行的关键字是 protoc --go_out= xx xx
3.1 编译成go版本
先编译生成go版本的服务端和客户端:
protoc --go_out=plugins=grpc:. userServer.proto
需要注意的是我们需要加上grpc的支持,生成grpc服务的代码。如果你执行报错,可能是protoc-gen-go
扩展没安装,安装很简单:
go get -u github.com/golang/protobuf/protoc-gen-go
它会在$GOPATH/bin
目录下生成1个可执行的protoc-gen-go
文件。所以,这个文件不要删了。不然你使用--go_out
时,会找不到protoc-gen-go
,提示报错。
OK,执行完毕之后,就会在proto目录下生成userServer.pg.go
的文件,里面将proto里的message都转换成了go语言的struct,并且也把rpc也转换生成了2个可调用的客户端函数。
我们看下生成后目录结构,生成的ph.go文件在proto目录下。
1 2 3 4 5 6 7 8 9 10 11 12
| . ├── client │ └── simple_client │ └── client.go ├── go.mod ├── go.sum ├── proto │ ├── userServer.pb.go │ └── userServer.proto └── server └── simple_server └── server.go
|
3.2 编译成php客户端
我们在PHP里面去调用go提供的grpc服务,那么PHP就是一个客户端,同理,在PHP里面使用,其实也需要编译这个protobug文件,需要用到--php_out
这个参数。
利用prcl快速安装protobuf和grpc这2个php扩展
1 2
| sudo pecl install protobuf sudo pecl install grpc
|
下载grpc源码,这一步是为了生成php的proto生成器,可以给我们生成client服务代码,当然你也可以自己去写代码,不用这个生成器。但是对于新手,我建议你下载安装。
1 2 3
| git clone -b v1.34.0 https://github.com/grpc/grpc git submodule update --init make grpc_php_plugin
|
php生成器位置生成在:/Users/Jack/www/grpc/bins/opt/grpc_php_plugin
我们把同一份userServer.proto
文件,拷贝到我们的php环境目录下。然后执行命令,生成php和grpc服务类php文件:
1
| protoc -I=. userServer.proto --php_out=. --grpc_out=. --plugin=protoc-gen-grpc=/Users/Jack/www/grpc/bins/opt/grpc_php_plugin
|
在项目更目录下新建 composer.json 文件
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "name": "grpc-go-php", "require": { "grpc/grpc": "^v1.34.0", "google/protobuf": "^v3.14.0" }, "autoload": { "psr-4": { "GPBMetadata\\": "GPBMetadata/", //自动生成的php类文件夹 "Proto\\": "Proto/" //proto的文件夹 } } }
|
执行下载2个依赖的库。这2个库类似于sdk,它们封装了很多方法方便我们应用层去使用,它们底层会去调用上面用prcl生成的2个php扩展的方法。
composer install
这一通,完成后,我们看下目录结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| ├── GPBMetadata │ └── UserServer.php ├── Proto │ ├── Age.php │ ├── Id.php │ ├── Name.php │ ├── UserInfo.php │ ├── UserParams.php │ └── UserServerClient.php ├── composer.json ├── composer.lock ├── main.php ├── userServer.proto └── vendor ├── autoload.php ├── composer ├── google │ └── protobuf │ └── grpc └─
|
其中GPBMetadata
和Proto
文件夹是自动生成的。vendor
里的2个扩展也是composer自动下载生成的。
OK,一切准备好了,go版本的服务端和客户端准备就绪。php版本的客户端也准备就绪。
4. 普通模式调用
普通模式,也叫一元模式,它是最常见,也是使用做多的方式,和普通的http请求模式是一样的。
客户端像服务端发送1次请求,然后服务端给出响应结果。很简单的模式。
1 2
| client --1---> server client <--2--- server
|
OK,那我们来看下,我们如何实现这种普通模式的调用。
4.1 go语言的调用实现
我们先用go来实现,上面go的服务端和客户端代码都生成好了。我们先在server
和client
目录下,分别新建自己的代码:
1 2 3 4 5 6 7
| . ├── client │ └── simple_client │ └── client.go └── server └── simple_server └── server.go
|
我们先来完成server端的代码逻辑,server端上面的逻辑,主要分3方面:
- 新建tcp连接。
- 注册grpc服务,并把它挂到tcp上。
- 完成对外提供的几个rpc方法的逻辑。
我们就按照这个逻辑来开始写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package main import ( "context" "fmt" "go-grpc-example/proto" "google.golang.org/grpc" "log" "math/rand" "net" )
type UserServer struct{} func main() { listen, err := net.Listen("tcp", "127.0.0.1:9527") if err != nil { log.Fatalf("tcp listen failed:%v", err) } server := grpc.NewServer() fmt.Println("userServer grpc services start success") proto.RegisterUserServerServer(server, &UserServer{}) _ = server.Serve(listen) }
func (Service *UserServer) SaveUser(ctx context.Context, params *proto.UserParams) (*proto.Id, error) { id := rand.Int31n(100) res := &proto.Id{Id: id} fmt.Printf("%+v", params.GetAge()) fmt.Printf("%+v\n", params.GetName()) return res, nil }
func (Service *UserServer) GetUserInfo(ctx context.Context, id *proto.Id) (*proto.UserInfo, error) { res := &proto.UserInfo{Id: id.GetId(), Name: "test", Age: 31} return res, nil }
|
2个rpc方法很简单,只是mock数据,并没有真实实现。后期有时间,再来实现吧。值得注意是,rpc的函数,第一个参数是固定的ctx context.Context
,这是用于控制信号和超时的,是固定写法。有空我专门来搞一起context包的学习
。
我们运行一下, grpc服务启动成功:
1 2
| $ go run server/simple_server/server.go userServer grpc services start success
|
在来看看client端,如何利用生成的pb.go文件,来实现逻辑呢?也是一样
- 监听server启动的tcp的ip:端口。
- 新建连接client服务,并绑定tcp。
- 去调用这2个rpc的函数。
开始写逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package main import ( "context" "fmt" "go-grpc-example/proto" "google.golang.org/grpc" "log" ) var client proto.UserServerClient func main() { connect, err := grpc.Dial("127.0.0.1:9527", grpc.WithInsecure()) if err != nil { log.Fatalln(err) } client = proto.NewUserServerClient(connect) SaveUser() GetUserInfo() } func SaveUser() { params := proto.UserParams{} params.Age = &proto.Age{Age: 31} params.Name = &proto.Name{Name: "test"} res, err := client.SaveUser(context.Background(), ¶ms) if err != nil { log.Fatalf("client.SaveUser err: %v", err) } fmt.Printf("%+v\n", res.Id) } func GetUserInfo() { res, err := client.GetUserInfo(context.Background(), &proto.Id{Id: 1}) if err != nil { log.Fatalf("client.userInfo err: %v", err) } fmt.Printf("%+v\n", res) }
|
调用的rpc的方法,是pb.go文件里已经帮我们生成了,我们直接调用即可。这2个函数的参数和返回值,和刚在server定义的得保持一致。第一个参数得是context.Context
。
我们测试一下client的代码,是否可以调通:
1 2 3
| $ go run client/simple_client/client.go 23 id:1 name:”test” age:31
|
我们可以看到有返回值了,在server那边也有了打印:
1 2 3 4 5 6
| $ go run server/simple_server/server.go userServer grpc services start success age:31name:"test" age:31name:"test" age:31name:"test" age:31name:"test"
|
ok,到此为止,go版本的客户端和服务端的grpc通讯成功了。
4.2 php语言的客户端调用
我们再用php来调用go的grpc服务,看下具体是怎么操作。首先,我们在上面已经自动帮我们生成了grpc的client的server类,那我们就直接调用。
操作逻辑,和go client的步骤是一样的,分成2步:
- 监听server启动的tcp的ip:端口,并直接新建连接client服务
- 去调用这2个rpc的函数。
我们具体看下,代码应该怎么写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| <?php
require __DIR__ . '/vendor/autoload.php';
$client = new \Proto\UserServerClient('127.0.0.1:9527', [ 'credentials' => Grpc\ChannelCredentials::createInsecure() ]);
$UserParams = new \Proto\UserParams(); $age = new \Proto\Age(); $age->setAge("18"); $UserParams->setAge($age); $name = new \Proto\Name(); $name->setName("jack"); $UserParams->setName($name);
list($Id, $status) = $client->SaveUser($UserParams)->wait(); var_dump($status, $Id->getId());
$Id = new \Proto\Id();
$Id->setId("1");
list($UserInfo, $status) = $client->GetUserInfo($Id)->wait(); var_dump($status, $UserInfo->getId(), $UserInfo->getName(), $UserInfo->getAge());
|
调用方式,看上去写法有点难受,没有go简洁,这是因为,php的调用方式都是用类
的方式呈现的,所以调用传值以及返回的都是对应的类。所以只能用setXXX()这种模式来赋值的,以及用
getXXX()方式来获取值。
我们执行一下:
1 2 3 4
| php client.php
0, 58 1, test, 31
|
我们再去go server端看下输出:
1 2 3 4
| age:18name:"jack" age:18name:"jack" age:18name:"jack" age:18name:"jack"
|
ok,php作为客户端调用go的grpc server成功!
5. Client-side streaming RPC 客户端流模式调用
什么是客户端流呢?也就是客户端在一次请求中,不断的将内容像流水一样,传给服务端。而服务端,则是需要不段的循环获取数据。
1 2
| client -1-> -2-> -3-> -4-> server client <-
|
为什么会有这种场景,因为存在一个痛点,就是客户端大包发送,
- 如果一次性发送大包,极有可能超时或者丢包,而通过流水的方式不断的发给服务端,则能保证实时性和安全性。
- 接收端还一边接收数据一边处理数据。也能保证数据的及时性。
所以,流的方式传输还是有很大的使用场景的。那我们先来看看,流式调用有啥不同的地方。
最大的不同就是在protobuf文件中,定义一个rpc 函数时候,得加一个stream
关键字,就表示这是一个流媒体的传输调用。
1 2 3 4 5
| service UserServer { rpc SaveUser (stream UserParams) returns (Id) {} rpc GetUserInfo (Id) returns (stream UserInfo) {} rpc DeleteUser(stream Id) returns (stream Status){} }
|
上面我们定义了3个rpc方法,如果是在函数的参数前,加stream
,表示是客户端发送流式的请求,反之,如果是返回的参数前面,加stream
,表示是客户端发送流式的请求。同理,2个都加,则表示是双向的,2边都在流式的发送,这种情况就复杂一些,我们分别来试一试。
5.1 go语言的客户端流调用
我们先完善一下protobuf文件,我们在proto目录下新建1个stream流式的文件,名字叫:streamArticleServer.proto
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| syntax = "proto3"; package proto; option go_package = ".;proto";
message Aid { int32 id = 1; }
message Author { string author = 1; }
message Title { string title = 1; }
message Content { string content = 1; }
message ArticleInfo { int32 id = 1; string author = 2; string title = 3; string content = 4; }
message ArticleParam { Author author = 2; Title title = 3; Content content = 4; }
message Status{ bool code = 1; }
service ArticleServer { rpc SaveArticle (stream ArticleParam) returns (Aid) {} rpc GetArticleInfo (Aid) returns (stream ArticleInfo) {} rpc DeleteArticle(stream Aid) returns (stream Status){} }
|
然后,我们执行一下,生成对应的go服务端和客户端文件:
protoc --go_out=plugins=grpc:. streamArticleServer.proto
这样,就在proto目录下,生成了一个新的streamArticleServer.pb.go
文件,grpc需要的client和server相关的调用都在这里面生成好了。
由于,我们本次只是看客户端流调用,那么我们只看SaveArticle
这个方法。
接下来,我们开始写client和server的调用代码。
我们先来完成server端的代码逻辑,server端上面的逻辑,主要分3方面:
- 新建tcp连接。
- 注册grpc服务,并把它挂到tcp上。
- 完成对外提供的几个rpc方法的逻辑。
这和普通的server是一样的,只是在处理具体的请求的时候,会用for 循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package main
import ( "fmt" "go-grpc-example/proto" "google.golang.org/grpc" "log" "net" ) type StreamArticleServer struct { } func (server *StreamArticleServer) SaveArticle(stream proto.ArticleServer_SaveArticleServer) error { return nil } func (server *StreamArticleServer) GetArticleInfo(aid *proto.Aid, stream proto.ArticleServer_GetArticleInfoServer) error { return nil } func (server *StreamArticleServer) DeleteArticle(stream proto.ArticleServer_DeleteArticleServer) error { return nil } func main() { listen, err := net.Listen("tcp", "127.0.0.1:9527") if err != nil { log.Fatalf("tcp listen failed:%v", err) } server := grpc.NewServer() proto.RegisterArticleServerServer(server, &StreamArticleServer{}) fmt.Println("article stream Server grpc services start success") _ = server.Serve(listen) }
|
我们先把架子搭起来,值得注意的是:绑定3个方法的时候,他们的参数,和普通的模式不一样了。
我们做的时候,可以看下 RegisterArticleServerServer(s *grpc.Server, srv ArticleServerServer)
的第二个参数 ArticleServerServer
,可以看下,这个接口是怎么写的,因为我们需要实现这个接口。
接口定义如下:
1 2 3 4 5
| type ArticleServerServer interface { SaveArticle(ArticleServer_SaveArticleServer) error GetArticleInfo(*Aid, ArticleServer_GetArticleInfoServer) error DeleteArticle(ArticleServer_DeleteArticleServer) error }
|
这样,我们自己去实现这3个接口的时候,也就知道自己的参数和返回值怎么写了。这也算是一种学习方法吧。
好了。架子搭完,我们就来实现以下client流的情况下,sever的实现方式,也就是 SaveArticle
这个函数的内容实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (server *StreamArticleServer) SaveArticle(stream proto.ArticleServer_SaveArticleServer) error { for { id := rand.Int31n(100) r, err := stream.Recv() if err == io.EOF { fmt.Println("读取数据结束") res := &proto.Aid{Id: id} return stream.SendAndClose(res) } if err != nil { return err } fmt.Printf("stream.rev author: %s, title: %s, context: %s", r.Author.GetAuthor(), r.Title.GetTitle(), r.Content.GetContent()) } }
|
首先是有个for 循环
,源源不断的接受client的流数据,然后通过判断err == io.EOF
来判断客户端额流水结束。然后整体返回1个随机的ID mock假数据。这样,server端就实现完毕了。
那接着我们来实现client流怎么写。
老规矩,先定义出client的架子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package main import ( "context" "fmt" "go-grpc-example/proto" "google.golang.org/grpc" "log" ) var client proto.ArticleServerClient
func main() { connect, err := grpc.Dial("127.0.0.1:9527", grpc.WithInsecure()) if err != nil { log.Fatal("connect grpc fail") } defer connect.Close() client = proto.NewArticleServerClient(connect) } func SaveArticle() { }
func GetArticleInfo() { }
func DeleteArticle() { }
|
前面的链接grpc的部分和普通的模式是一样的。重点是本次的client流式该怎么写呢?也就是SaveArticle
方法的写法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| func SaveArticle() { //定义一组数据 SaveList := map[string]proto.ArticleParam{ "1": {Author: &proto.Author{Author: "tony"}, Title: &proto.Title{Title: "title1"}, Content: &proto.Content{Content: "content1"}}, "2": {Author: &proto.Author{Author: "jack"}, Title: &proto.Title{Title: "title2"}, Content: &proto.Content{Content: "content2"}}, "3": {Author: &proto.Author{Author: "tom"}, Title: &proto.Title{Title: "title3"}, Content: &proto.Content{Content: "content3"}}, "4": {Author: &proto.Author{Author: "boby"}, Title: &proto.Title{Title: "title4"}, Content: &proto.Content{Content: "content4"}}, } //先调用函数 stream, err := client.SaveArticle(context.Background()) if err != nil { log.Fatal("SaveArticle grpc fail", err.Error()) } //再循环发送 for _, info := range SaveList { err = stream.Send(&info) if err != nil { log.Fatal("SaveArticle Send info fail", err.Error()) } } //发送关闭新号,并且获取返回值 resp, err := stream.CloseAndRecv() if err != nil { log.Fatal("SaveArticle CloseAndRecv fail", err.Error()) } fmt.Printf("resp: id = %d", resp.GetId()) }
|
这里需要注意的是,我们搞了个map,来循环给server发送数据,然后再就是关闭新号发送,再接受数据。可以看出,和server的方法是反着来的。也比较好记。
我们测试一下。首先是启动server,启动成功:
1 2
| $ go run server/stream_server/server.go article stream Server grpc services start success
|
然后,我们编辑client.go
里面main
函数里的SaveArticle
的注释,客户端执行调用一下:
1 2
| $ go run client/stream_client/client.go resp: id = 81
|
调用成功,返回ID = 81。
再看下server那边的输出:
1 2 3 4 5 6 7 8
| $ go run server/stream_server/server.go article stream Server grpc services start success
stream.rev author: jack, title: title2, context: content2 stream.rev author: tom, title: title3, context: content3 stream.rev author: boby, title: title4, context: content4 stream.rev author: tony, title: title1, context: content1 读取数据结束
|
OK,server 也输出正常。完全联通!
当然,你说,客户端是流模式,就一定得搞个for循环去发送数据么?当然不是!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func SaveArticle2() { SaveInfo := proto.ArticleParam { Author: &proto.Author{Author: "mark"}, Title: &proto.Title{Title: "title5"}, Content: &proto.Content{Content: "content5"}, } stream, err := client.SaveArticle(context.Background()) if err != nil { log.Fatal("SaveArticle grpc fail", err.Error()) } err = stream.Send(&SaveInfo) if err != nil { log.Fatal("SaveArticle Send info fail", err.Error()) } resp, err := stream.CloseAndRecv() if err != nil { log.Fatal("SaveArticle CloseAndRecv fail", err.Error()) } fmt.Printf("resp: id = %d", resp.GetId()) }
|
这样也是可以的。相当于数组是1。循环了1次而已。
5.2 php语言的客户端流调用
我们看下php版本的client流模式如何写呢?直接上代码吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| <?php
require __DIR__ . '/vendor/autoload.php'; SaveArticle(); function SaveArticle() { $client = new \Proto\ArticleServerClient('127.0.0.1:9527', [ 'credentials' => Grpc\ChannelCredentials::createInsecure() ]); $stream = $client->SaveArticle(); $ArticleParam = new \Proto\ArticleParam(); for ($i = 0; $i < 10; $i++) { $ArticleParam->setAuthor((new \Proto\Author())->setAuthor("kevin1")); $ArticleParam->setTitle((new \Proto\Title())->setTitle("title_php_" . $i)); $ArticleParam->setContent((new \Proto\Content())->setContent("content_php_" . $i)); $stream->write($ArticleParam); }
list($aid, $status) = $stream->wait(); var_dump($aid->getId()); }
|
我们执行一下,打印:98,同时服务端server也输出了响应的流水数据:
1 2 3 4 5 6 7 8 9 10 11
| stream.rev author: kevin1, title: title_php_0, context: content_php_0 stream.rev author: kevin1, title: title_php_1, context: content_php_1 stream.rev author: kevin1, title: title_php_2, context: content_php_2 stream.rev author: kevin1, title: title_php_3, context: content_php_3 stream.rev author: kevin1, title: title_php_4, context: content_php_4 stream.rev author: kevin1, title: title_php_5, context: content_php_5 stream.rev author: kevin1, title: title_php_6, context: content_php_6 stream.rev author: kevin1, title: title_php_7, context: content_php_7 stream.rev author: kevin1, title: title_php_8, context: content_php_8 stream.rev author: kevin1, title: title_php_9, context: content_php_9 读取数据结束
|
PHP的client的很多地方处理方式,和go client 很类似,比如,先调用client->SaveArticle()
,参数是空,啥也不传,然后再循环写入(write/Send)。然后,再发送关闭,等待结果返回。
6. 服务端流模式调用
有了前面客户端流模式的铺垫,服务端流模式就简单了很多。无非是将之前的操作反过来操作下。server不断的循环发送给client,然后client循环接受。套路是一样的。整个通讯过程就变成了这样:
1 2
| client ----1------------------> server client <-5- <-4- <-3- <-2- <-1- server
|
6.1 go语言的服务端流调用
我们先来看下go服务端代码的编写,也就是实现server.go中GetArticleInfo
函数里面的内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func (server *StreamArticleServer) GetArticleInfo(aid *proto.Aid, stream proto.ArticleServer_GetArticleInfoServer) error { for i := 0; i < 6; i++ { id := strconv.Itoa(int(aid.GetId())) err := stream.Send(&proto.ArticleInfo{ Id: aid.GetId(), Author: "jack", Title: "title_go_" + id, Content: "content_go_" + id, }) if err != nil { return err } } fmt.Println("发送完毕") return nil }
|
单纯的服务端流,就比较简单,1个for循环6次,每次send
数据即可,也不需要关闭。
我们再看下go client怎么实现呢?也就是client.go中现实GetArticleInfo
函数里面的内容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func GetArticleInfo() { Aid := proto.Aid{ Id: 2, } stream, err := client.GetArticleInfo(context.Background(), &Aid) if err != nil { log.Fatal("GetArticleInfo grpc fail", err.Error()) } for { r, err := stream.Recv() if err == io.EOF { fmt.Println("读取数据结束") break } if err != nil { log.Fatal("GetArticleInfo Recv fail", err.Error()) } fmt.Printf("stream.rev aid: %d, author: %s, title: %s, context: %s\n", r.GetId(), r.GetAuthor(), r.GetTitle(), r.GetContent()) } }
|
client代码也同样比较简单,搞个for死循环去Recv
就可以了。判断是否是EOF
了,则表示sever发送结束了,就可以跳出循环,结束。
我们运行下,看下client的输出:
1 2 3 4 5 6 7 8 9
| $ go run client/stream_client/client.go
stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2 stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2 stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2 stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2 stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2 stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2 读取数据结束
|
server端的输出:
1 2 3
| $ go run server/stream_server/server.go article stream Server grpc services start success 发送完毕
|
OK ,go语言版本的client和server通宵成功,结束!
6.2 php语言的服务端流调用
由于,我们本次只要PHP作为client调用,所以,我们只看下PHP如何接受go的server流的数据,其实和前面的client server 类似,反过来即可。我们直接看下GetArticleInfo
函数代码怎么实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| function GetArticleInfo() { $client = new \Proto\ArticleServerClient('127.0.0.1:9527', [ 'credentials' => Grpc\ChannelCredentials::createInsecure() ]); $stream = $client->GetArticleInfo((new \Proto\Aid())->setId("668")); $features = $stream->responses();
foreach ($features as $feature) { echo $feature->getId() . "--" . $feature->getAuthor() . "--" . $feature->getTitle() . "--" . $feature->getContent() . PHP_EOL; } }
|
需要注意的地方就是语法的改变,普通模式是使用wait
方法就可以直接获取结果了,在服务流模式下,client写法就不一样了,得先response
,再foreach
循环这个值。
我们运行下:
1 2 3 4 5 6 7
| . ------ ------ ------ ------ ------ ------
|
OK,客户端获取数据成功,这些数据都是server通过流模式传输过来的。再看下server端的输出:
1 2 3 4
| $ go run server/stream_server/server.go article stream Server grpc services start success
668 发送完毕
|
好了,整个通讯就完成了。
7. 双向流模式调用
双向模式顾名思义,就是client和server都是流水模式,2边一起流水。
1 2
| client -1-> -2-> -3-> --4> -5-> server client <-5- <-4- <-3- <-2- <-1- server
|
通过前面的单独的流水模式,我们应该可以猜到代码该怎么写了,无非就是把之前的send啊,recv啊一起上呗,for循环也一起都用。下面开始写。
7.1 go语言的双向流流调用
首先是go语言的服务端的写法,我们直接写吧,也就是实现函数DeleteArticle
内的方法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| func (server *StreamArticleServer) DeleteArticle(stream proto.ArticleServer_DeleteArticleServer) error { for { r, err := stream.Recv() if err == io.EOF { fmt.Println("read done!") return nil } if err != nil { return err } fmt.Printf("stream.rev aid: %d\n", r.GetId()) err = stream.Send(&proto.Status{Code: true}) if err != nil { return err } } }
|
代码也比较好理解,先1个for循环,里面先去Rev
,再去Send
。当然,反着来也是可以的。
我们再看下client怎么实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func DeleteArticle() { stream, err := client.DeleteArticle(context.Background()) if err != nil { log.Fatal("DeleteArticle grpc fail", err.Error()) } for i := 0; i < 6; i++ { err = stream.Send(&proto.Aid{Id: int32(i)}) if err != nil { log.Fatal("DeleteArticle Send fail", err.Error()) } r, err := stream.Recv() if err == io.EOF { break } if err != nil { log.Fatal("GetArticleInfo Recv fail", err.Error()) } fmt.Printf("stream.rev status: %v\n", r.GetCode()) } _ = stream.CloseSend() }
|
客户端也差不多,先来1个6for循环,然后先Send
,再Recv
。这次不能反着来,不能就阻塞了。for 循环结束后,可以主动发送一个CloseSend
,这样server就可以手动手动EOF
的信息了。
我们先运行server,再运行client,看下打印输出:
1 2 3 4 5 6 7 8 9
| #client
$ go run client/stream_client/client.go stream.rev status: true stream.rev status: true stream.rev status: true stream.rev status: true stream.rev status: true stream.rev status: true
|
再看下server端的输出:
1 2 3 4 5 6 7 8 9 10
| $ go run server/stream_server/server.go article stream Server grpc services start success
stream.rev aid: 0 stream.rev aid: 1 stream.rev aid: 2 stream.rev aid: 3 stream.rev aid: 4 stream.rev aid: 5 read done!
|
OK,通讯成功!
7.2 php语言的双向流流调用
go的client已经OK了,我们继续看下PHP作为client的情况。直接上代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| function DeleteArticle() { $client = new \Proto\ArticleServerClient('127.0.0.1:9527', [ 'credentials' => Grpc\ChannelCredentials::createInsecure() ]); $stream = $client->DeleteArticle(); $AidParam = new \Proto\Aid(); for ($i = 0; $i < 6; $i++) { $AidParam->setId($i); $stream->write($AidParam); } $stream->writesDone();
while ($reply = $stream->read()) { var_dump($reply->getCode()); } }
|
写法和go的client稍有不同,先自己for 6次写,再调用writesDone
写入结束,再while循环read
,打印出code信息。
我们运行下:
1 2 3 4 5 6 7
| $ php stream.php bool(true) bool(true) bool(true) bool(true) bool(true) bool(true)
|
服务端:
1 2 3 4 5 6 7
| stream.rev aid: 0 stream.rev aid: 1 stream.rev aid: 2 stream.rev aid: 3 stream.rev aid: 4 stream.rev aid: 5 read done!
|
OK,整个grpc的通讯和夸语言的调用就结束了,还是收获满满的。接下来,我们要看下grpc tls加密通讯,以及设置超时的context,还有就是如何同时提供http的Restful的接口方式,以及如何部署,服务发现以及负载均衡的实现。
8. TLS加密通讯
上面的这些例子都是讲的明文通讯,在某些情况下很容易被截获的,还是有点危险的。因为grpc是基于http2的,所以我们看下,如何配置tls,使其支持https的特性呢?
那么回顾下,https的核心逻辑:
- server 采用非对称加密,生成一个公钥 public1 和私钥 private1
- server 把公钥 public1 传给 client
- client 采用对称加密生成1个秘钥A (或者2个秘钥A,内容都是一样)
- client 用server给自己的公钥 public1 加密自己生成的对称秘钥A。生成了一个秘钥B.
- client 把秘钥 B 传给server。
- client 用 秘钥A 加密需要传输的数据Data,并传给server。
- server 收到 秘钥B后,用自己的私钥 private1 解开了,得到了秘钥A。
- server 收到加密后的data 后,用秘钥A解开了,获得了元素数据。
简而言之,就是采用非对称加密+对称加密的方式。其中,对称加密产生的秘钥,是既可以加密,又可以解密的,加密解密速度很快。而采用非对称加密,则不可以,必须公钥解密私钥,或者私钥加密公钥,加解密速度慢。这样一个组合,就可以保障数据得到加密,又不会影响速度。
OK,知道了原理之后,我们看下具体在代码里如何实现。
首先,我们要生成server的 公钥public1
和 私钥 private1
。那就得用到openssl
命令了。
需要注意的是Go在1.15版本,X509 无法使用了,需要用Sans算法代替, 具体的操作可参考这篇文章:Go1.15下Sans 秘钥生成过程
这样,我们就得到了2个key,1个是test.pem
,它就是公钥。 1个是test.key
,它是私钥。其中,我们设置openssl.cnf中alt_names
为:
1 2 3
| [ alt_names ] DNS.1 = www.zchd.ltd DNS.2 = www.test.zchd.ltd
|
顾明思意,设置的通用名称是这个。这个在client调用中会用到,不清楚先别急。
8.1 golang中使用tls加密
我们先在golang中的server 和clent 中使用tls,看看怎么做,首先是server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| package main
import ( "context" "fmt" "go-grpc-example/proto" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "log" "math/rand" "net" ) type UserServer struct{} func main() { c, err := credentials.NewServerTLSFromFile("/Users/Jack/www/gowww/go-grpc-example/conf/test.pem", "/Users/Jack/www/gowww/go-grpc-example/conf/test.key") if err != nil { log.Fatalf("new tls server err:", err.Error()) } listen, err := net.Listen("tcp", "127.0.0.1:9528") if err != nil { log.Fatalf("tcp listen failed:%v", err) } server := grpc.NewServer(grpc.Creds(c)) fmt.Println("userServer grpc services start success") proto.RegisterUserServerServer(server, &UserServer{}) _ = server.Serve(listen) }
func (Service *UserServer) SaveUser(ctx context.Context, params *proto.UserParams) (*proto.Id, error) { id := rand.Int31n(100) res := &proto.Id{Id: id} fmt.Printf("%+v ", params.GetAge()) fmt.Printf("%+v\n", params.GetName()) return res, nil } func (Service *UserServer) GetUserInfo(ctx context.Context, id *proto.Id) (*proto.UserInfo, error) { res := &proto.UserInfo{Id: id.GetId(), Name: "test", Age: 31} return res, nil }
|
我们可以发现,除了注册Grpc使用证书不同之外,其他的rpc函数和非tls上是一致的。我们看下client怎么写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package main import ( "context" "fmt" "go-grpc-example/proto" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "log" ) var client proto.UserServerClient func main() { crt, err := credentials.NewClientTLSFromFile("/Users/Jack/www/gowww/go-grpc-example/conf/test.pem", "www.zchd.ltd") if err != nil { panic(err.Error()) } connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithTransportCredentials(crt)) if err != nil { log.Fatalln(err) } defer connect.Close() client = proto.NewUserServerClient(connect) SaveUser() } func SaveUser() { params := proto.UserParams{} params.Age = &proto.Age{Age: 31} params.Name = &proto.Name{Name: "test"} res, err := client.SaveUser(context.Background(), ¶ms) if err != nil { log.Fatalf("client.SaveUser err: %v", err) } fmt.Println(res.Id) } func GetUserInfo() { res, err := client.GetUserInfo(context.Background(), &proto.Id{Id: 1}) if err != nil { log.Fatalf("client.userInfo err: %v", err) } fmt.Printf("%+v\n", res) }
|
代码总体也很简单,需要注意的是NewClientTLSFromFile()这个函数,第一个参数需要传
pem公钥文件,第一个参数传
serverNameOverride`,也就是我们在OpenSSL.cnf里面设置DNS的名字。
我们运行一下:
1 2
| $ go run client/simple_token_client/client.go 47
|
服务端也有输出:
1 2
| $ go run server/simple_tls_server/server.go age:31 name:"test"
|
成功连接。需要注意的是2个证书的生成,涉及很多openssl命令,要注意别搞错了,这个搞错就很同意连接不成功,出现各种问题。
8.2 php client 使用tls加密 连接
老规矩,我们在PHP中的client,也可以用这种方式来加密连接一下服务端,直接上代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| <?php
require __DIR__ . '/vendor/autoload.php';
$pem = file_get_contents("/Users/Jack/www/gowww/go-grpc-example/conf/test.pem");
$client = new \Proto\UserServerClient('127.0.0.1:9528', [ 'credentials' => \Grpc\ChannelCredentials::createSsl($pem), 'grpc.ssl_target_name_override' => 'www.zchd.ltd', ]);
$UserParams = new \Proto\UserParams(); $age = new \Proto\Age(); $age->setAge(18); $UserParams->setAge($age); $name = new \Proto\Name(); $name->setName("jack"); $UserParams->setName($name);
list($Id, $status) = $client->SaveUser($UserParams)->wait(); var_dump($status->code, $Id->getId());
$Id = new \Proto\Id();
$Id->setId("1");
list($UserInfo, $status) = $client->GetUserInfo($Id)->wait(); var_dump($status->code, $UserInfo->getId(), $UserInfo->getName(), $UserInfo->getAge());
|
需要注意的是证书的导入和写法,有点却别。运行一下:
1 2 3 4 5 6 7
| $ php main_tls.php int(0) int(59) int(0) int(1) string(4) "test" int(31)
|
成功了。
9. 超时控制
我们平时在代码中通过curl调用1个http请求的时候,都会设置timeout超时,这个是非常重要的,之前笔者就经历过1个接口没设置超时时间,由于1个接口读取时间很长,导致请求长时间等待,由于http请求堆积太多导致,服务线程池飙升,就导致节点死机。所以这是很严重的一个事情。
那么,我们再client中去调用grpc的服务请求的时候,也应该要设置超时时间,这个超时可以通过context来实现。
所以,核心是用到context这个包,来设置,有2种方式,都可以:
1 2 3 4 5
| ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
|
直接上client代码吧:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package main
import ( "context" "fmt" "go-grpc-example/proto" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "log" "time" ) var client proto.UserServerClient var ctx context.Context var cancel context.CancelFunc func main() { connect, err := grpc.Dial("127.0.0.1:9527", grpc.WithInsecure()) if err != nil { log.Fatalln(err) } defer connect.Close() ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second) defer cancel() client = proto.NewUserServerClient(connect) SaveUser() } func SaveUser() { params := proto.UserParams{} params.Age = &proto.Age{Age: 31} params.Name = &proto.Name{Name: "test"} fmt.Println(time.Now().Format("2006-01-02 15:04:05")) res, err := client.SaveUser(ctx, ¶ms) fmt.Println(err) if err != nil { got := status.Code(err) if got == codes.DeadlineExceeded { log.Println("client.SaveUser err: deadline") } log.Printf("client.SaveUser err: %+v", err) } else { fmt.Println(res.Id) } }
|
client先是设置了context的WithTimeout
时间为1秒,然后判断调用grpc函数SaveUser
的错误返回值,如果限制超时,就终止请求。
server端其实也需要对这个超时时间做及时的判断,因为,server端可能请求了很多协程服务,client已经停止了,那么server端也应该要及时的停止了,而不是还在后端运行和计算,这样也可以节省服务器很多资源:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package main import ( "context" "fmt" "go-grpc-example/proto" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "log" "math/rand" "net" "time" ) type UserServer struct{} func main() { listen, err := net.Listen("tcp", "127.0.0.1:9527") if err != nil { log.Fatalf("tcp listen failed:%v", err) } server := grpc.NewServer() fmt.Println("userServer grpc services start success") proto.RegisterUserServerServer(server, &UserServer{}) _ = server.Serve(listen) }
func (Service *UserServer) SaveUser(ctx context.Context, params *proto.UserParams) (*proto.Id, error) { time.Sleep(3*time.Second) timeD, ok := ctx.Deadline() if ok { fmt.Println(timeD.Format("2006-01-02 15:04:05"), ctx.Err()) return nil, status.Errorf(codes.Canceled, "UserServer.SaveUser Deadline") } id := rand.Int31n(100) res := &proto.Id{Id: id} fmt.Printf("%+v, ", params.GetAge()) fmt.Printf("%+v\n", params.GetName()) return res, nil }
|
我们再server端,模拟了3秒超时。
ok,我们运行一下:
1 2 3 4 5 6 7 8 9
| $ go run client/simple_timeout_client/client.go 2021-01-11 18:20:22 2021/01/11 18:20:23 client.SaveUser err: deadline 2021/01/11 18:20:23 client.SaveUser err: rpc error: code = DeadlineExceeded desc = context deadline exceeded
$ go run server/simple_timeout_server/server.go userServer grpc services start success
2021-01-11 18:20:23 context deadline exceeded
|
可以看到,在1秒后,deadline了。成功!
当然,也可以用select + ctx.Done()
的模式,来监听client的取消事件的:
关键代码如下:
1 2 3 4 5
| select { case <-ctx.Done(): fmt.Println("ctx.Done", ctx.Err()) return nil, status.Errorf(codes.Canceled, "UserServer.SaveUser Deadline") }
|
参考文章:
1.https://eddycjy.com/posts/go/grpc/2018-09-22-install/
2.https://www.cnblogs.com/xiangxiaolin/p/12791281.html
3.https://github.com/eddycjy/go-grpc-example
4.php grpc 官方demo https://github.com/grpc/grpc/tree/v1.34.0/examples/php
5.Go 1.15 解决GRPC X509 https://blog.csdn.net/cuichenghd/article/details/109230584
转自:https://www.zybuluo.com/phper/note/1764719