go和php中的gRpc实战

目录
  1. 1. 1. 需求分析
  2. 2. 2. 编写ptorobuf文件
  3. 3. 3. 编译成go版本的服务端和php版本的客户端文件
    1. 3.1. 3.1 编译成go版本
    2. 3.2. 3.2 编译成php客户端
  4. 4. 4. 普通模式调用
    1. 4.1. 4.1 go语言的调用实现
    2. 4.2. 4.2 php语言的客户端调用
  5. 5. 5. Client-side streaming RPC 客户端流模式调用
    1. 5.1. 5.1 go语言的客户端流调用
    2. 5.2. 5.2 php语言的客户端流调用
  6. 6. 6. 服务端流模式调用
    1. 6.1. 6.1 go语言的服务端流调用
    2. 6.2. 6.2 php语言的服务端流调用
  7. 7. 7. 双向流模式调用
    1. 7.1. 7.1 go语言的双向流流调用
    2. 7.2. 7.2 php语言的双向流流调用
  8. 8. 8. TLS加密通讯
    1. 8.1. 8.1 golang中使用tls加密
    2. 8.2. 8.2 php client 使用tls加密 连接
  9. 9. 9. 超时控制

研究微服务治理,苦于找不到相对于php合适的rpc框架。多个服务起来,用curl或者guzzle相互域名请求简直太逆天奇葩了。由于服务大部分都是php的,没有常驻后台的daemon开发送心跳。因此对于rpc的选型也是个很严肃的问题。
看过yar、dubbo等协议,但是不是特别理想。yar本身是php的扩展。但是不支持服务治理,或者说只是一个远程过程调用的工具。dubbo主力支持java,其他语言虽然支持,但本质对于php来说和curl没大区别。今儿实战演示一下gRpc的几种调用通讯模式(普通、客户端流、服务端流、双向流)。以及和PHP客户端的联通调用。

1. 需求分析

我们这次只搞个很简单的需求,搞个用户server系统,提供2个接口给外部,1个是保存用户信息,1个是根据用户UID查询用户信息,就这2个,不搞复杂了。

简单的用代码描述下就是:

1
2
function1 saveUser(name, age)  return  (UID)
function1 getUserInfo(UID) return (UID, name, age)

ok,需求分析结束。

2. 编写ptorobuf文件

需求分析结束之后,我们明确了我们需要干什么了,我们需要对外提供2个grp接口,那么我们就开始编写protobuf吧:

先定义2个rpc服务函数

1
2
3
4
service UserServer  {
rpc SaveUser (UserParams) returns (Id) {}
rpc GetUserInfo (Id) returns (UserInfo) {}
}

定义好了rpc之后,我们知道,函数里面的参数和返回值得都得是message信息,OK,我们就开始创建参数和返回值对应的3个message吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//用户ID
message Id {
int32 id = 1;
}

//名字
message Name {
string name = 1;
}

//年龄
message Age {
int32 age = 1;
}

// 用户信息
message UserInfo {
int32 id = 1;
string name = 2;
int32 age = 3;
}

// 用户参数
message UserParams {
Name name = 1;
Age age = 2;
}

上面的写法是,将age和name单独出来搞成2个message,然后在UserParams里面搞成嵌套message,目的是想看下嵌套message在不同语言中的用法。但是总体也是比较简单的。

完整的ptoro文件为:

1
2
3
4
5
6
7
8
9
//userServer.proto

syntax = "proto3";
package proto;
option go_package = ".;proto";

message ...

service ...

3. 编译成go版本的服务端和php版本的客户端文件

编写完成protobuf文件了,接下来就是编译成不同的语言了,我们将使用protoc命令,来编译生成不同语言的版本库。我们用go语言作为服务端语言,用php和go分别作为客户端语言,完成本次的调用。

如果你机器上还未安装protobuf工具,安装很简单,如果是Mac的话,一条命令就搞定了:

brew install protobuf

如果是其他系统,请参考官方文档,也很简单。

安装好的服务,命令执行的关键字是 protoc --go_out= xx xx

3.1 编译成go版本

先编译生成go版本的服务端和客户端:

protoc --go_out=plugins=grpc:. userServer.proto

需要注意的是我们需要加上grpc的支持,生成grpc服务的代码。如果你执行报错,可能是protoc-gen-go扩展没安装,安装很简单:

go get -u github.com/golang/protobuf/protoc-gen-go

它会在$GOPATH/bin目录下生成1个可执行的protoc-gen-go文件。所以,这个文件不要删了。不然你使用--go_out 时,会找不到protoc-gen-go,提示报错。

OK,执行完毕之后,就会在proto目录下生成userServer.pg.go的文件,里面将proto里的message都转换成了go语言的struct,并且也把rpc也转换生成了2个可调用的客户端函数。

我们看下生成后目录结构,生成的ph.go文件在proto目录下。

1
2
3
4
5
6
7
8
9
10
11
12
.
├── client
│ └── simple_client
│ └── client.go
├── go.mod
├── go.sum
├── proto
│ ├── userServer.pb.go
│ └── userServer.proto
└── server
└── simple_server
└── server.go

3.2 编译成php客户端

我们在PHP里面去调用go提供的grpc服务,那么PHP就是一个客户端,同理,在PHP里面使用,其实也需要编译这个protobug文件,需要用到--php_out这个参数。

利用prcl快速安装protobuf和grpc这2个php扩展

1
2
sudo pecl install protobuf
sudo pecl install grpc

下载grpc源码,这一步是为了生成php的proto生成器,可以给我们生成client服务代码,当然你也可以自己去写代码,不用这个生成器。但是对于新手,我建议你下载安装。

1
2
3
git clone -b v1.34.0 https://github.com/grpc/grpc  #可能需要一段时间
git submodule update --init #可能需要一段时间
make grpc_php_plugin

php生成器位置生成在:/Users/Jack/www/grpc/bins/opt/grpc_php_plugin

我们把同一份userServer.proto文件,拷贝到我们的php环境目录下。然后执行命令,生成php和grpc服务类php文件:

1
protoc -I=. userServer.proto --php_out=.  --grpc_out=.  --plugin=protoc-gen-grpc=/Users/Jack/www/grpc/bins/opt/grpc_php_plugin

在项目更目录下新建 composer.json 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"name": "grpc-go-php",
"require": {
"grpc/grpc": "^v1.34.0",
"google/protobuf": "^v3.14.0"
},
"autoload": {
"psr-4": {
"GPBMetadata\\": "GPBMetadata/", //自动生成的php类文件夹
"Proto\\": "Proto/" //proto的文件夹
}
}
}

执行下载2个依赖的库。这2个库类似于sdk,它们封装了很多方法方便我们应用层去使用,它们底层会去调用上面用prcl生成的2个php扩展的方法。

composer install

这一通,完成后,我们看下目录结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
├──  GPBMetadata
│ └── UserServer.php
├── Proto
│ ├── Age.php
│ ├── Id.php
│ ├── Name.php
│ ├── UserInfo.php
│ ├── UserParams.php
│ └── UserServerClient.php
├── composer.json
├── composer.lock
├── main.php
├── userServer.proto
└── vendor
├── autoload.php
├── composer
├── google
│ └── protobuf

└── grpc
└─

其中GPBMetadataProto文件夹是自动生成的。vendor里的2个扩展也是composer自动下载生成的。

OK,一切准备好了,go版本的服务端和客户端准备就绪。php版本的客户端也准备就绪。

4. 普通模式调用

普通模式,也叫一元模式,它是最常见,也是使用做多的方式,和普通的http请求模式是一样的。

客户端像服务端发送1次请求,然后服务端给出响应结果。很简单的模式。

1
2
client --1---> server
client <--2--- server

OK,那我们来看下,我们如何实现这种普通模式的调用。

4.1 go语言的调用实现

我们先用go来实现,上面go的服务端和客户端代码都生成好了。我们先在serverclient目录下,分别新建自己的代码:

1
2
3
4
5
6
7
.
├── client
│ └── simple_client
│ └── client.go
└── server
└── simple_server
└── server.go

我们先来完成server端的代码逻辑,server端上面的逻辑,主要分3方面:

  1. 新建tcp连接。
  2. 注册grpc服务,并把它挂到tcp上。
  3. 完成对外提供的几个rpc方法的逻辑。

我们就按照这个逻辑来开始写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main
import (
"context"
"fmt"
"go-grpc-example/proto"
"google.golang.org/grpc"
"log"
"math/rand"
"net"
)
//新建1个结构体,下面绑定了2个方法,实现了UserServerServer接口。
type UserServer struct{}
func main() {
//监听tcp
listen, err := net.Listen("tcp", "127.0.0.1:9527")
if err != nil {
log.Fatalf("tcp listen failed:%v", err)
}
//新建grpc
server := grpc.NewServer()
fmt.Println("userServer grpc services start success")
//rcp方法注册到grpc
proto.RegisterUserServerServer(server, &UserServer{})
//监听tcp
_ = server.Serve(listen)
}

//保存用户
//第一个参数是固定的context
func (Service *UserServer) SaveUser(ctx context.Context, params *proto.UserParams) (*proto.Id, error) {
id := rand.Int31n(100) //随机生成id 模式保存成功
res := &proto.Id{Id: id}
fmt.Printf("%+v", params.GetAge())
fmt.Printf("%+v\n", params.GetName())
return res, nil
}

//获取用户信息
//第一个参数是固定的context
func (Service *UserServer) GetUserInfo(ctx context.Context, id *proto.Id) (*proto.UserInfo, error) {
res := &proto.UserInfo{Id: id.GetId(), Name: "test", Age: 31}
return res, nil
}

2个rpc方法很简单,只是mock数据,并没有真实实现。后期有时间,再来实现吧。值得注意是,rpc的函数,第一个参数是固定的ctx context.Context,这是用于控制信号和超时的,是固定写法。有空我专门来搞一起context包的学习

我们运行一下, grpc服务启动成功:

1
2
$ go run server/simple_server/server.go
userServer grpc services start success

在来看看client端,如何利用生成的pb.go文件,来实现逻辑呢?也是一样

  1. 监听server启动的tcp的ip:端口。
  2. 新建连接client服务,并绑定tcp。
  3. 去调用这2个rpc的函数。

开始写逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main
import (
"context"
"fmt"
"go-grpc-example/proto"
"google.golang.org/grpc"
"log"
)
var client proto.UserServerClient
func main() {
//链接tcp端口
connect, err := grpc.Dial("127.0.0.1:9527", grpc.WithInsecure())
if err != nil {
log.Fatalln(err)
}
//新建client
client = proto.NewUserServerClient(connect)
//调用
SaveUser()
GetUserInfo()
}
func SaveUser() {
params := proto.UserParams{}
params.Age = &proto.Age{Age: 31}
params.Name = &proto.Name{Name: "test"}
res, err := client.SaveUser(context.Background(), &params)
if err != nil {
log.Fatalf("client.SaveUser err: %v", err)
}
fmt.Printf("%+v\n", res.Id)
}
func GetUserInfo() {
res, err := client.GetUserInfo(context.Background(), &proto.Id{Id: 1})
if err != nil {
log.Fatalf("client.userInfo err: %v", err)
}
fmt.Printf("%+v\n", res)
}

调用的rpc的方法,是pb.go文件里已经帮我们生成了,我们直接调用即可。这2个函数的参数和返回值,和刚在server定义的得保持一致。第一个参数得是context.Context

我们测试一下client的代码,是否可以调通:

1
2
3
$ go run client/simple_client/client.go
23
id:1 name:”test” age:31

我们可以看到有返回值了,在server那边也有了打印:

1
2
3
4
5
6
$ go run server/simple_server/server.go
userServer grpc services start success
age:31name:"test"
age:31name:"test"
age:31name:"test"
age:31name:"test"

ok,到此为止,go版本的客户端和服务端的grpc通讯成功了。

4.2 php语言的客户端调用

我们再用php来调用go的grpc服务,看下具体是怎么操作。首先,我们在上面已经自动帮我们生成了grpc的client的server类,那我们就直接调用。

操作逻辑,和go client的步骤是一样的,分成2步:

  1. 监听server启动的tcp的ip:端口,并直接新建连接client服务
  2. 去调用这2个rpc的函数。

我们具体看下,代码应该怎么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?php
//引入 composer 的自动载加
require __DIR__ . '/vendor/autoload.php';
//连接Grpc服务端
//并连接客户端
$client = new \Proto\UserServerClient('127.0.0.1:9527', [
'credentials' => Grpc\ChannelCredentials::createInsecure()
]);

//实例化 $UserParams 请求类
$UserParams = new \Proto\UserParams();
$age = new \Proto\Age();
$age->setAge("18");
$UserParams->setAge($age);
$name = new \Proto\Name();
$name->setName("jack");
$UserParams->setName($name);

//调用远程服务
/**
* @var $Id \Proto\Id
*/
list($Id, $status) = $client->SaveUser($UserParams)->wait();
var_dump($status, $Id->getId());

//实例化Id类
$Id = new \Proto\Id();

//赋值
$Id->setId("1");

//调用
/**
* @var $User \Proto\UserInfo
*/
list($UserInfo, $status) = $client->GetUserInfo($Id)->wait();
var_dump($status, $UserInfo->getId(), $UserInfo->getName(), $UserInfo->getAge());

调用方式,看上去写法有点难受,没有go简洁,这是因为,php的调用方式都是用的方式呈现的,所以调用传值以及返回的都是对应的类。所以只能用setXXX()这种模式来赋值的,以及用getXXX()方式来获取值。

我们执行一下:

1
2
3
4
php client.php

0, 58
1, test, 31

我们再去go server端看下输出:

1
2
3
4
age:18name:"jack"
age:18name:"jack"
age:18name:"jack"
age:18name:"jack"

ok,php作为客户端调用go的grpc server成功!

5. Client-side streaming RPC 客户端流模式调用

什么是客户端流呢?也就是客户端在一次请求中,不断的将内容像流水一样,传给服务端。而服务端,则是需要不段的循环获取数据。

1
2
client -1->  -2->  -3->  -4-> server
client <--------1---------- server

为什么会有这种场景,因为存在一个痛点,就是客户端大包发送,

  1. 如果一次性发送大包,极有可能超时或者丢包,而通过流水的方式不断的发给服务端,则能保证实时性和安全性。
  2. 接收端还一边接收数据一边处理数据。也能保证数据的及时性。

所以,流的方式传输还是有很大的使用场景的。那我们先来看看,流式调用有啥不同的地方。

最大的不同就是在protobuf文件中,定义一个rpc 函数时候,得加一个stream关键字,就表示这是一个流媒体的传输调用。

1
2
3
4
5
service UserServer  {
rpc SaveUser (stream UserParams) returns (Id) {} //客户端流
rpc GetUserInfo (Id) returns (stream UserInfo) {} //服务端流
rpc DeleteUser(stream Id) returns (stream Status){} //双向流
}

上面我们定义了3个rpc方法,如果是在函数的参数前,加stream,表示是客户端发送流式的请求,反之,如果是返回的参数前面,加stream,表示是客户端发送流式的请求。同理,2个都加,则表示是双向的,2边都在流式的发送,这种情况就复杂一些,我们分别来试一试。

5.1 go语言的客户端流调用

我们先完善一下protobuf文件,我们在proto目录下新建1个stream流式的文件,名字叫:streamArticleServer.proto:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
syntax = "proto3";
package proto;
option go_package = ".;proto";
//ID
message Aid {
int32 id = 1;
}
//作者
message Author {
string author = 1;
}
//标题
message Title {
string title = 1;
}
//内容
message Content {
string content = 1;
}
// 文章信息
message ArticleInfo {
int32 id = 1;
string author = 2;
string title = 3;
string content = 4;
}
// 保存文章信息
message ArticleParam {
Author author = 2;
Title title = 3;
Content content = 4;
}
//删除状态
message Status{
bool code = 1;
}
// 声明那些方法可以使用rpc
service ArticleServer {
rpc SaveArticle (stream ArticleParam) returns (Aid) {}
rpc GetArticleInfo (Aid) returns (stream ArticleInfo) {}
rpc DeleteArticle(stream Aid) returns (stream Status){}
}
//执行 : protoc --go_out=plugins=grpc:. streamArticleServer.proto

然后,我们执行一下,生成对应的go服务端和客户端文件:

protoc --go_out=plugins=grpc:. streamArticleServer.proto

这样,就在proto目录下,生成了一个新的streamArticleServer.pb.go文件,grpc需要的client和server相关的调用都在这里面生成好了。

由于,我们本次只是看客户端流调用,那么我们只看SaveArticle这个方法。

接下来,我们开始写client和server的调用代码。

我们先来完成server端的代码逻辑,server端上面的逻辑,主要分3方面:

  1. 新建tcp连接。
  2. 注册grpc服务,并把它挂到tcp上。
  3. 完成对外提供的几个rpc方法的逻辑。

这和普通的server是一样的,只是在处理具体的请求的时候,会用for 循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main
//流式服务
import (
"fmt"
"go-grpc-example/proto"
"google.golang.org/grpc"
"log"
"net"
)
type StreamArticleServer struct {
}
func (server *StreamArticleServer) SaveArticle(stream proto.ArticleServer_SaveArticleServer) error {
return nil
}
func (server *StreamArticleServer) GetArticleInfo(aid *proto.Aid, stream proto.ArticleServer_GetArticleInfoServer) error {
return nil
}
func (server *StreamArticleServer) DeleteArticle(stream proto.ArticleServer_DeleteArticleServer) error {
return nil
}
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:9527")
if err != nil {
log.Fatalf("tcp listen failed:%v", err)
}
server := grpc.NewServer()
proto.RegisterArticleServerServer(server, &StreamArticleServer{})
fmt.Println("article stream Server grpc services start success")
_ = server.Serve(listen)
}

我们先把架子搭起来,值得注意的是:绑定3个方法的时候,他们的参数,和普通的模式不一样了

我们做的时候,可以看下 RegisterArticleServerServer(s *grpc.Server, srv ArticleServerServer)的第二个参数 ArticleServerServer,可以看下,这个接口是怎么写的,因为我们需要实现这个接口。

接口定义如下:

1
2
3
4
5
type ArticleServerServer  interface  {
SaveArticle(ArticleServer_SaveArticleServer) error
GetArticleInfo(*Aid, ArticleServer_GetArticleInfoServer) error
DeleteArticle(ArticleServer_DeleteArticleServer) error
}

这样,我们自己去实现这3个接口的时候,也就知道自己的参数和返回值怎么写了。这也算是一种学习方法吧。

好了。架子搭完,我们就来实现以下client流的情况下,sever的实现方式,也就是 SaveArticle这个函数的内容实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (server *StreamArticleServer) SaveArticle(stream proto.ArticleServer_SaveArticleServer) error {
for {
id := rand.Int31n(100)
r, err := stream.Recv()
if err == io.EOF {
fmt.Println("读取数据结束")
res := &proto.Aid{Id: id}
return stream.SendAndClose(res)
}
if err != nil {
return err
}
fmt.Printf("stream.rev author: %s, title: %s, context: %s", r.Author.GetAuthor(), r.Title.GetTitle(), r.Content.GetContent())
}
}

首先是有个for 循环,源源不断的接受client的流数据,然后通过判断err == io.EOF来判断客户端额流水结束。然后整体返回1个随机的ID mock假数据。这样,server端就实现完毕了。

那接着我们来实现client流怎么写。

老规矩,先定义出client的架子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main
import (
"context"
"fmt"
"go-grpc-example/proto"
"google.golang.org/grpc"
"log"
)
var client proto.ArticleServerClient

func main() {
connect, err := grpc.Dial("127.0.0.1:9527", grpc.WithInsecure())
if err != nil {
log.Fatal("connect grpc fail")
}
defer connect.Close()
client = proto.NewArticleServerClient(connect)
//SaveArticle()
//GetArticleInfo()
//DeleteArticle()
}
func SaveArticle() {
}

func GetArticleInfo() {
}

func DeleteArticle() {
}

前面的链接grpc的部分和普通的模式是一样的。重点是本次的client流式该怎么写呢?也就是SaveArticle方法的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func SaveArticle()  {
//定义一组数据
SaveList := map[string]proto.ArticleParam{
"1": {Author: &proto.Author{Author: "tony"}, Title: &proto.Title{Title: "title1"}, Content: &proto.Content{Content: "content1"}},
"2": {Author: &proto.Author{Author: "jack"}, Title: &proto.Title{Title: "title2"}, Content: &proto.Content{Content: "content2"}},
"3": {Author: &proto.Author{Author: "tom"}, Title: &proto.Title{Title: "title3"}, Content: &proto.Content{Content: "content3"}},
"4": {Author: &proto.Author{Author: "boby"}, Title: &proto.Title{Title: "title4"}, Content: &proto.Content{Content: "content4"}},
}
//先调用函数
stream, err := client.SaveArticle(context.Background())
if err != nil {
log.Fatal("SaveArticle grpc fail", err.Error())
}
//再循环发送
for _, info := range SaveList {
err = stream.Send(&info)
if err != nil {
log.Fatal("SaveArticle Send info fail", err.Error())
}
}
//发送关闭新号,并且获取返回值
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatal("SaveArticle CloseAndRecv fail", err.Error())
}
fmt.Printf("resp: id = %d", resp.GetId())
}

这里需要注意的是,我们搞了个map,来循环给server发送数据,然后再就是关闭新号发送,再接受数据。可以看出,和server的方法是反着来的。也比较好记。

我们测试一下。首先是启动server,启动成功:

1
2
$ go run server/stream_server/server.go
article stream Server grpc services start success

然后,我们编辑client.go里面main函数里的SaveArticle的注释,客户端执行调用一下:

1
2
$ go run client/stream_client/client.go
resp: id = 81

调用成功,返回ID = 81。

再看下server那边的输出:

1
2
3
4
5
6
7
8
$ go run server/stream_server/server.go
article stream Server grpc services start success

stream.rev author: jack, title: title2, context: content2
stream.rev author: tom, title: title3, context: content3
stream.rev author: boby, title: title4, context: content4
stream.rev author: tony, title: title1, context: content1
读取数据结束

OK,server 也输出正常。完全联通!

当然,你说,客户端是流模式,就一定得搞个for循环去发送数据么?当然不是!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func SaveArticle2()  {
//定义一组数据
SaveInfo := proto.ArticleParam {
Author: &proto.Author{Author: "mark"}, Title: &proto.Title{Title: "title5"}, Content: &proto.Content{Content: "content5"},
}
//先调用函数
stream, err := client.SaveArticle(context.Background())
if err != nil {
log.Fatal("SaveArticle grpc fail", err.Error())
}
//发送
err = stream.Send(&SaveInfo)
if err != nil {
log.Fatal("SaveArticle Send info fail", err.Error())
}
////发送关闭新号,并且获取返回值
resp, err := stream.CloseAndRecv()
if err != nil {
log.Fatal("SaveArticle CloseAndRecv fail", err.Error())
}
fmt.Printf("resp: id = %d", resp.GetId())
}

这样也是可以的。相当于数组是1。循环了1次而已。

5.2 php语言的客户端流调用

我们看下php版本的client流模式如何写呢?直接上代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?php
//引入 composer 的自动载加
require __DIR__ . '/vendor/autoload.php';
SaveArticle();
function SaveArticle()
{
//连接 gRpc服务端
$client = new \Proto\ArticleServerClient('127.0.0.1:9527', [
'credentials' => Grpc\ChannelCredentials::createInsecure()
]);
//请求 SaveArticle 方法
$stream = $client->SaveArticle();
$ArticleParam = new \Proto\ArticleParam();
//循环流式写入数据
for ($i = 0; $i < 10; $i++) {
$ArticleParam->setAuthor((new \Proto\Author())->setAuthor("kevin1"));
$ArticleParam->setTitle((new \Proto\Title())->setTitle("title_php_" . $i));
$ArticleParam->setContent((new \Proto\Content())->setContent("content_php_" . $i));
$stream->write($ArticleParam);
}
//关闭并返回结果
/**
* @var $aid \proto\Aid
*/
list($aid, $status) = $stream->wait();
//打印AID
var_dump($aid->getId());
}

我们执行一下,打印:98,同时服务端server也输出了响应的流水数据:

1
2
3
4
5
6
7
8
9
10
11
stream.rev author: kevin1, title: title_php_0, context: content_php_0
stream.rev author: kevin1, title: title_php_1, context: content_php_1
stream.rev author: kevin1, title: title_php_2, context: content_php_2
stream.rev author: kevin1, title: title_php_3, context: content_php_3
stream.rev author: kevin1, title: title_php_4, context: content_php_4
stream.rev author: kevin1, title: title_php_5, context: content_php_5
stream.rev author: kevin1, title: title_php_6, context: content_php_6
stream.rev author: kevin1, title: title_php_7, context: content_php_7
stream.rev author: kevin1, title: title_php_8, context: content_php_8
stream.rev author: kevin1, title: title_php_9, context: content_php_9
读取数据结束

PHP的client的很多地方处理方式,和go client 很类似,比如,先调用client->SaveArticle(),参数是空,啥也不传,然后再循环写入(write/Send)。然后,再发送关闭,等待结果返回。

6. 服务端流模式调用

有了前面客户端流模式的铺垫,服务端流模式就简单了很多。无非是将之前的操作反过来操作下。server不断的循环发送给client,然后client循环接受。套路是一样的。整个通讯过程就变成了这样:

1
2
client ----1------------------> server
client <-5- <-4- <-3- <-2- <-1- server

6.1 go语言的服务端流调用

我们先来看下go服务端代码的编写,也就是实现server.go中GetArticleInfo函数里面的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (server *StreamArticleServer) GetArticleInfo(aid *proto.Aid, stream proto.ArticleServer_GetArticleInfoServer) error {
for i := 0; i < 6; i++ {
id := strconv.Itoa(int(aid.GetId()))
err := stream.Send(&proto.ArticleInfo{
Id: aid.GetId(),
Author: "jack",
Title: "title_go_" + id,
Content: "content_go_" + id,
})
if err != nil {
return err
}
}
fmt.Println("发送完毕")
return nil
}

单纯的服务端流,就比较简单,1个for循环6次,每次send数据即可,也不需要关闭。

我们再看下go client怎么实现呢?也就是client.go中现实GetArticleInfo函数里面的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func GetArticleInfo() {
Aid := proto.Aid{
Id: 2,
}
//请求
stream, err := client.GetArticleInfo(context.Background(), &Aid)
if err != nil {
log.Fatal("GetArticleInfo grpc fail", err.Error())
}
//循环接受server流发来数据
for {
r, err := stream.Recv()
if err == io.EOF {
fmt.Println("读取数据结束")
break
}
if err != nil {
log.Fatal("GetArticleInfo Recv fail", err.Error())
}
fmt.Printf("stream.rev aid: %d, author: %s, title: %s, context: %s\n", r.GetId(), r.GetAuthor(), r.GetTitle(), r.GetContent())
}
}

client代码也同样比较简单,搞个for死循环去Recv就可以了。判断是否是EOF了,则表示sever发送结束了,就可以跳出循环,结束。

我们运行下,看下client的输出:

1
2
3
4
5
6
7
8
9
$ go run client/stream_client/client.go

stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2
stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2
stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2
stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2
stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2
stream.rev aid: 2, author: jack, title: title_go_2, context: content_go_2
读取数据结束

server端的输出:

1
2
3
$ go run server/stream_server/server.go
article stream Server grpc services start success
发送完毕

OK ,go语言版本的client和server通宵成功,结束!

6.2 php语言的服务端流调用

由于,我们本次只要PHP作为client调用,所以,我们只看下PHP如何接受go的server流的数据,其实和前面的client server 类似,反过来即可。我们直接看下GetArticleInfo函数代码怎么实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
function GetArticleInfo()
{
//连接 gRpc服务端
$client = new \Proto\ArticleServerClient('127.0.0.1:9527', [
'credentials' => Grpc\ChannelCredentials::createInsecure()
]);
//请求 SaveArticle 方法
$stream = $client->GetArticleInfo((new \Proto\Aid())->setId("668"));
//获取服务流的数据
$features = $stream->responses();
//循环遍历打印出来
/**
* @var $feature \proto\ArticleInfo
*/
foreach ($features as $feature) {
echo $feature->getId() . "--" . $feature->getAuthor() . "--" . $feature->getTitle() . "--" . $feature->getContent() . PHP_EOL;
}
}

需要注意的地方就是语法的改变,普通模式是使用wait方法就可以直接获取结果了,在服务流模式下,client写法就不一样了,得先response,再foreach循环这个值。

我们运行下:

1
2
3
4
5
6
7
$ php stream.php
668--jack--title_go_668--content_go_668
668--jack--title_go_668--content_go_668
668--jack--title_go_668--content_go_668
668--jack--title_go_668--content_go_668
668--jack--title_go_668--content_go_668
668--jack--title_go_668--content_go_668

OK,客户端获取数据成功,这些数据都是server通过流模式传输过来的。再看下server端的输出:

1
2
3
4
$ go run server/stream_server/server.go
article stream Server grpc services start success

668 发送完毕

好了,整个通讯就完成了。

7. 双向流模式调用

双向模式顾名思义,就是client和server都是流水模式,2边一起流水。

1
2
client -1->  -2->  -3->  --4>  -5-> server
client <-5- <-4- <-3- <-2- <-1- server

通过前面的单独的流水模式,我们应该可以猜到代码该怎么写了,无非就是把之前的send啊,recv啊一起上呗,for循环也一起都用。下面开始写。

7.1 go语言的双向流流调用

首先是go语言的服务端的写法,我们直接写吧,也就是实现函数DeleteArticle内的方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//双端
func (server *StreamArticleServer) DeleteArticle(stream proto.ArticleServer_DeleteArticleServer) error {
for {
//循环接收client发送的流数据
r, err := stream.Recv()
if err == io.EOF {
fmt.Println("read done!")
return nil
}
if err != nil {
return err
}
fmt.Printf("stream.rev aid: %d\n", r.GetId())
//循环发流数据给client
err = stream.Send(&proto.Status{Code: true})
if err != nil {
return err
}
//fmt.Println("send done!")
}
}

代码也比较好理解,先1个for循环,里面先去Rev,再去Send。当然,反着来也是可以的。

我们再看下client怎么实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//双向流
func DeleteArticle() {
//链接rpc
stream, err := client.DeleteArticle(context.Background())
if err != nil {
log.Fatal("DeleteArticle grpc fail", err.Error())
}
for i := 0; i < 6; i++ {
//先发
err = stream.Send(&proto.Aid{Id: int32(i)})
if err != nil {
log.Fatal("DeleteArticle Send fail", err.Error())
}
//再收
r, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatal("GetArticleInfo Recv fail", err.Error())
}
fmt.Printf("stream.rev status: %v\n", r.GetCode())
}
//发送结束
_ = stream.CloseSend()
}

客户端也差不多,先来1个6for循环,然后先Send,再Recv。这次不能反着来,不能就阻塞了。for 循环结束后,可以主动发送一个CloseSend,这样server就可以手动手动EOF的信息了。

我们先运行server,再运行client,看下打印输出:

1
2
3
4
5
6
7
8
9
#client

$ go run client/stream_client/client.go
stream.rev status: true
stream.rev status: true
stream.rev status: true
stream.rev status: true
stream.rev status: true
stream.rev status: true

再看下server端的输出:

1
2
3
4
5
6
7
8
9
10
$ go run server/stream_server/server.go
article stream Server grpc services start success

stream.rev aid: 0
stream.rev aid: 1
stream.rev aid: 2
stream.rev aid: 3
stream.rev aid: 4
stream.rev aid: 5
read done!

OK,通讯成功!

7.2 php语言的双向流流调用

go的client已经OK了,我们继续看下PHP作为client的情况。直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
function DeleteArticle()
{
//连接 gRpc服务端
$client = new \Proto\ArticleServerClient('127.0.0.1:9527', [
'credentials' => Grpc\ChannelCredentials::createInsecure()
]);
//请求 SaveArticle 方法
$stream = $client->DeleteArticle();
$AidParam = new \Proto\Aid();
//循环流式写入数据
for ($i = 0; $i < 6; $i++) {
$AidParam->setId($i);
$stream->write($AidParam);
}
//写入结束
$stream->writesDone();
/**
* @var $reply \proto\Status
*/
while ($reply = $stream->read()) {
var_dump($reply->getCode());
}
}

写法和go的client稍有不同,先自己for 6次写,再调用writesDone写入结束,再while循环read,打印出code信息。

我们运行下:

1
2
3
4
5
6
7
$ php stream.php
bool(true)
bool(true)
bool(true)
bool(true)
bool(true)
bool(true)

服务端:

1
2
3
4
5
6
7
stream.rev aid:  0
stream.rev aid: 1
stream.rev aid: 2
stream.rev aid: 3
stream.rev aid: 4
stream.rev aid: 5
read done!

OK,整个grpc的通讯和夸语言的调用就结束了,还是收获满满的。接下来,我们要看下grpc tls加密通讯,以及设置超时的context,还有就是如何同时提供http的Restful的接口方式,以及如何部署,服务发现以及负载均衡的实现。

8. TLS加密通讯

上面的这些例子都是讲的明文通讯,在某些情况下很容易被截获的,还是有点危险的。因为grpc是基于http2的,所以我们看下,如何配置tls,使其支持https的特性呢?

那么回顾下,https的核心逻辑:

  1. server 采用非对称加密,生成一个公钥 public1 和私钥 private1
  2. server 把公钥 public1 传给 client
  3. client 采用对称加密生成1个秘钥A (或者2个秘钥A,内容都是一样)
  4. client 用server给自己的公钥 public1 加密自己生成的对称秘钥A。生成了一个秘钥B.
  5. client 把秘钥 B 传给server。
  6. client 用 秘钥A 加密需要传输的数据Data,并传给server。
  7. server 收到 秘钥B后,用自己的私钥 private1 解开了,得到了秘钥A。
  8. server 收到加密后的data 后,用秘钥A解开了,获得了元素数据。

简而言之,就是采用非对称加密+对称加密的方式。其中,对称加密产生的秘钥,是既可以加密,又可以解密的,加密解密速度很快。而采用非对称加密,则不可以,必须公钥解密私钥,或者私钥加密公钥,加解密速度慢。这样一个组合,就可以保障数据得到加密,又不会影响速度。

OK,知道了原理之后,我们看下具体在代码里如何实现。

首先,我们要生成server的 公钥public1私钥 private1。那就得用到openssl命令了。

需要注意的是Go在1.15版本,X509 无法使用了,需要用Sans算法代替, 具体的操作可参考这篇文章:Go1.15下Sans 秘钥生成过程

这样,我们就得到了2个key,1个是test.pem,它就是公钥。 1个是test.key,它是私钥。其中,我们设置openssl.cnf中alt_names为:

1
2
3
[ alt_names ]
DNS.1 = www.zchd.ltd
DNS.2 = www.test.zchd.ltd

顾明思意,设置的通用名称是这个。这个在client调用中会用到,不清楚先别急。

8.1 golang中使用tls加密

我们先在golang中的server 和clent 中使用tls,看看怎么做,首先是server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main
//采用https的token加密
import (
"context"
"fmt"
"go-grpc-example/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"log"
"math/rand"
"net"
)
type UserServer struct{}
func main() {
//读2个证书
c, err := credentials.NewServerTLSFromFile("/Users/Jack/www/gowww/go-grpc-example/conf/test.pem", "/Users/Jack/www/gowww/go-grpc-example/conf/test.key")
if err != nil {
log.Fatalf("new tls server err:", err.Error())
}
//监听端口
listen, err := net.Listen("tcp", "127.0.0.1:9528")
if err != nil {
log.Fatalf("tcp listen failed:%v", err)
}
//新建grpc服务,并且传入证书handle
server := grpc.NewServer(grpc.Creds(c))
fmt.Println("userServer grpc services start success")
//注册本次的UserServer 服务
proto.RegisterUserServerServer(server, &UserServer{})
_ = server.Serve(listen)
}
//保存用户
func (Service *UserServer) SaveUser(ctx context.Context, params *proto.UserParams) (*proto.Id, error) {
id := rand.Int31n(100) //随机生成id 模式保存成功
res := &proto.Id{Id: id}
fmt.Printf("%+v ", params.GetAge())
fmt.Printf("%+v\n", params.GetName())
return res, nil
}
func (Service *UserServer) GetUserInfo(ctx context.Context, id *proto.Id) (*proto.UserInfo, error) {
res := &proto.UserInfo{Id: id.GetId(), Name: "test", Age: 31}
return res, nil
}

我们可以发现,除了注册Grpc使用证书不同之外,其他的rpc函数和非tls上是一致的。我们看下client怎么写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main
import (
"context"
"fmt"
"go-grpc-example/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"log"
)
var client proto.UserServerClient
func main() {
//读取证书和服务名
crt, err := credentials.NewClientTLSFromFile("/Users/Jack/www/gowww/go-grpc-example/conf/test.pem", "www.zchd.ltd")
if err != nil {
panic(err.Error())
}
//监听端口,并传入证书handle
connect, err := grpc.Dial("127.0.0.1:9528", grpc.WithTransportCredentials(crt))
if err != nil {
log.Fatalln(err)
}
defer connect.Close()
//新建服务客户端
client = proto.NewUserServerClient(connect)
SaveUser()
//GetUserInfo()
}
func SaveUser() {
params := proto.UserParams{}
params.Age = &proto.Age{Age: 31}
params.Name = &proto.Name{Name: "test"}
res, err := client.SaveUser(context.Background(), &params)
if err != nil {
log.Fatalf("client.SaveUser err: %v", err)
}
fmt.Println(res.Id)
}
func GetUserInfo() {
res, err := client.GetUserInfo(context.Background(), &proto.Id{Id: 1})
if err != nil {
log.Fatalf("client.userInfo err: %v", err)
}
fmt.Printf("%+v\n", res)
}

代码总体也很简单,需要注意的是NewClientTLSFromFile()这个函数,第一个参数需要传pem公钥文件,第一个参数传serverNameOverride`,也就是我们在OpenSSL.cnf里面设置DNS的名字。

我们运行一下:

1
2
$ go run client/simple_token_client/client.go
47

服务端也有输出:

1
2
$ go run server/simple_tls_server/server.go
age:31 name:"test"

成功连接。需要注意的是2个证书的生成,涉及很多openssl命令,要注意别搞错了,这个搞错就很同意连接不成功,出现各种问题。

8.2 php client 使用tls加密 连接

老规矩,我们在PHP中的client,也可以用这种方式来加密连接一下服务端,直接上代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
<?php
//引入 composer 的自动载加
require __DIR__ . '/vendor/autoload.php';

//公钥内容
$pem = file_get_contents("/Users/Jack/www/gowww/go-grpc-example/conf/test.pem");

//导入公钥证书和DNS name名
$client = new \Proto\UserServerClient('127.0.0.1:9528', [
'credentials' => \Grpc\ChannelCredentials::createSsl($pem),
'grpc.ssl_target_name_override' => 'www.zchd.ltd',
]);

//实例化 $UserParams 请求类
$UserParams = new \Proto\UserParams();
$age = new \Proto\Age();
$age->setAge(18);
$UserParams->setAge($age);
$name = new \Proto\Name();
$name->setName("jack");
$UserParams->setName($name);

//调用远程服务
/**
* @var $Id \Proto\Id
*/
list($Id, $status) = $client->SaveUser($UserParams)->wait();
var_dump($status->code, $Id->getId());

//实例化Id类
$Id = new \Proto\Id();

//赋值
$Id->setId("1");

//调用
/**
* @var $User \Proto\UserInfo
*/
list($UserInfo, $status) = $client->GetUserInfo($Id)->wait();
var_dump($status->code, $UserInfo->getId(), $UserInfo->getName(), $UserInfo->getAge());

需要注意的是证书的导入和写法,有点却别。运行一下:

1
2
3
4
5
6
7
$ php main_tls.php
int(0)
int(59)
int(0)
int(1)
string(4) "test"
int(31)

成功了。

9. 超时控制

我们平时在代码中通过curl调用1个http请求的时候,都会设置timeout超时,这个是非常重要的,之前笔者就经历过1个接口没设置超时时间,由于1个接口读取时间很长,导致请求长时间等待,由于http请求堆积太多导致,服务线程池飙升,就导致节点死机。所以这是很严重的一个事情。

那么,我们再client中去调用grpc的服务请求的时候,也应该要设置超时时间,这个超时可以通过context来实现。

所以,核心是用到context这个包,来设置,有2种方式,都可以:

1
2
3
4
5
//设置超时时间为1秒
ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))

//更好的写法
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)

直接上client代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main
//超时控制
import (
"context"
"fmt"
"go-grpc-example/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"time"
)
var client proto.UserServerClient
var ctx context.Context
var cancel context.CancelFunc
func main() {
connect, err := grpc.Dial("127.0.0.1:9527", grpc.WithInsecure())
if err != nil {
log.Fatalln(err)
}
defer connect.Close()
//ctx, cancel = context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))
//另一种写法,1秒超时
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
client = proto.NewUserServerClient(connect)
SaveUser()
}
func SaveUser() {
params := proto.UserParams{}
params.Age = &proto.Age{Age: 31}
params.Name = &proto.Name{Name: "test"}
//打印当前时间
fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
//开始请求
res, err := client.SaveUser(ctx, &params)
fmt.Println(err)
if err != nil {
got := status.Code(err)
//客户端自己超时控制
if got == codes.DeadlineExceeded {
log.Println("client.SaveUser err: deadline")
}
log.Printf("client.SaveUser err: %+v", err)
} else {
fmt.Println(res.Id)
}
}

client先是设置了context的WithTimeout时间为1秒,然后判断调用grpc函数SaveUser的错误返回值,如果限制超时,就终止请求。

server端其实也需要对这个超时时间做及时的判断,因为,server端可能请求了很多协程服务,client已经停止了,那么server端也应该要及时的停止了,而不是还在后端运行和计算,这样也可以节省服务器很多资源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main
import (
"context"
"fmt"
"go-grpc-example/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"log"
"math/rand"
"net"
"time"
)
type UserServer struct{}
func main() {
listen, err := net.Listen("tcp", "127.0.0.1:9527")
if err != nil {
log.Fatalf("tcp listen failed:%v", err)
}
server := grpc.NewServer()
fmt.Println("userServer grpc services start success")
proto.RegisterUserServerServer(server, &UserServer{})
_ = server.Serve(listen)
}
//保存用户
func (Service *UserServer) SaveUser(ctx context.Context, params *proto.UserParams) (*proto.Id, error) {
time.Sleep(3*time.Second)
//检测是否超时
timeD, ok := ctx.Deadline()
if ok {
fmt.Println(timeD.Format("2006-01-02 15:04:05"), ctx.Err())
return nil, status.Errorf(codes.Canceled, "UserServer.SaveUser Deadline")
}
id := rand.Int31n(100) //随机生成id 模式保存成功
res := &proto.Id{Id: id}
fmt.Printf("%+v, ", params.GetAge())
fmt.Printf("%+v\n", params.GetName())
return res, nil
}

我们再server端,模拟了3秒超时。

ok,我们运行一下:

1
2
3
4
5
6
7
8
9
$ go run client/simple_timeout_client/client.go
2021-01-11 18:20:22
2021/01/11 18:20:23 client.SaveUser err: deadline
2021/01/11 18:20:23 client.SaveUser err: rpc error: code = DeadlineExceeded desc = context deadline exceeded

$ go run server/simple_timeout_server/server.go
userServer grpc services start success

2021-01-11 18:20:23 context deadline exceeded

可以看到,在1秒后,deadline了。成功!

当然,也可以用select + ctx.Done()的模式,来监听client的取消事件的:

关键代码如下:

1
2
3
4
5
select {
case <-ctx.Done():
fmt.Println("ctx.Done", ctx.Err())
return nil, status.Errorf(codes.Canceled, "UserServer.SaveUser Deadline")
}

参考文章:
1.https://eddycjy.com/posts/go/grpc/2018-09-22-install/
2.https://www.cnblogs.com/xiangxiaolin/p/12791281.html
3.https://github.com/eddycjy/go-grpc-example
4.php grpc 官方demo https://github.com/grpc/grpc/tree/v1.34.0/examples/php
5.Go 1.15 解决GRPC X509 https://blog.csdn.net/cuichenghd/article/details/109230584

转自:https://www.zybuluo.com/phper/note/1764719