mirror of
https://github.com/singchia/frontier.git
synced 2026-04-22 16:07:04 +08:00
Feat/adjust readme (#94)
* feat: adjust readme * feat: adjust readme * feat: adjust readme * feat: adjust readme * feat: adjust readme * feat: adjust readme * feat: adjust readme * feat: adjust readme
This commit is contained in:
+60
-848
@@ -9,24 +9,65 @@
|
||||
[](https://pkg.go.dev/github.com/singchia/frontier/api/dataplane/v1/service)
|
||||
[](https://opensource.org/licenses/Apache-2.0)
|
||||
|
||||
[简体中文](./README.md) | English
|
||||
English | [简体中文](./README_zh.md)
|
||||
|
||||
</div>
|
||||
|
||||
|
||||
Frontier is a full-duplex, open-source long-connection gateway developed in Go. It aims to allow microservices to directly reach edge nodes or clients, and vice versa. It provides full-duplex bidirectional RPC calls, message publishing and receiving, and point-to-point stream functionality for both. Frontier complies with cloud-native architecture, enabling quick deployment of a cluster using Operator, ensuring high availability and elasticity, and easily supporting the requirement of millions of online edge nodes or clients.
|
||||
# Frontier
|
||||
|
||||
Frontier is a full-duplex, open-source long-connection gateway written in Go. It enables microservices to directly reach edge nodes or clients, and vice versa. It provides full-duplex bidirectional RPC, messaging, and point-to-point streams. Frontier follows cloud-native architecture principles, supports fast cluster deployment via Operator, and is built for high availability and elastic scaling to millions of online edge nodes or clients.
|
||||
|
||||
## Table of Contents
|
||||
|
||||
- [Features](#features)
|
||||
- [Quick Start](#quick-start)
|
||||
- [Architecture](#architecture)
|
||||
- [Usage](#usage)
|
||||
- [Configuration](#configuration)
|
||||
- [Deployment](#deployment)
|
||||
- [Cluster](#cluster)
|
||||
- [Kubernetes](#kubernetes)
|
||||
- [Development](#development)
|
||||
- [Testing](#testing)
|
||||
- [Community](#community)
|
||||
- [License](#license)
|
||||
|
||||
## Quick Start
|
||||
|
||||
1. Run a single Frontier instance:
|
||||
|
||||
```bash
|
||||
docker run -d --name frontier -p 30011:30011 -p 30012:30012 singchia/frontier:1.1.0
|
||||
```
|
||||
|
||||
2. Build and run examples:
|
||||
|
||||
```bash
|
||||
make examples
|
||||
```
|
||||
|
||||
Run the chatroom example:
|
||||
|
||||
```bash
|
||||
# Terminal 1
|
||||
./bin/chatroom_service
|
||||
|
||||
# Terminal 2
|
||||
./bin/chatroom_agent
|
||||
```
|
||||
|
||||
Demo video: https://github.com/singchia/frontier/assets/15531166/18b01d96-e30b-450f-9610-917d65259c30
|
||||
|
||||
## Features
|
||||
|
||||
- **RPC**: Microservices and edges can call each other's functions (pre-registered), with load balancing supported on the microservice side.
|
||||
- **Messaging**: Microservices and edges can publish to each other's topics, and edges can publish to external MQ topics, with load balancing supported on the microservice side.
|
||||
- **Multiplexing/Streams**: Microservices can directly open a stream (connection) on edge nodes, enabling functions like file upload and proxy, turning obstacles into pathways.
|
||||
- **Online/Offline Control**: Microservices can register to get edge node IDs, and callbacks for online/offline events. When these events occur, Frontier will invoke these functions.
|
||||
- **Simple API**: The project provides well-packaged SDKs in the api directory for both edges and microservices, making development based on this SDK very simple.
|
||||
- **Easy Deployment**: Supports various deployment methods (docker, docker-compose, helm, and operator) to deploy Frontier instances or clusters.
|
||||
- **Horizontal Scaling**: Provides Frontier and Frontlas clusters. When single instance performance reaches a bottleneck, you can horizontally scale Frontier instances or clusters.
|
||||
- **High Availability**: Supports cluster deployment and allows microservices and edge nodes to permanently reconnect using the SDK. In case of current instance failure, it switches to a new available instance to continue services.
|
||||
- **Control Plane Support**: Provides gRPC and REST interfaces, allowing operation personnel to query or delete microservices and edge nodes. Deletion will force the target offline.
|
||||
- **Bidirectional RPC**: Services and edges can call each other with load balancing.
|
||||
- **Messaging**: Topic-based publish/receive between services, edges, and external MQ.
|
||||
- **Point-to-Point Streams**: Open direct streams for proxying, file transfer, and custom traffic.
|
||||
- **Cloud-Native Deployment**: Run via Docker, Compose, Helm, or Operator.
|
||||
- **High Availability and Scaling**: Support for reconnect, clustering, and horizontal scale with Frontlas.
|
||||
- **Auth and Presence**: Edge auth and online/offline notifications.
|
||||
- **Control Plane APIs**: gRPC and REST APIs for querying and managing online nodes.
|
||||
|
||||
|
||||
## Architecture
|
||||
@@ -121,780 +162,13 @@ Frontier requires both microservices and edge nodes to actively connect to Front
|
||||
|
||||
## Usage
|
||||
|
||||
### Example
|
||||
Detailed usage guide: [docs/USAGE.md](./docs/USAGE.md)
|
||||
|
||||
In the [examples/chatroom](./examples/chatroom) directory, there is a simple chatroom example implemented in just 100 lines of code. You can get the executable programs chatroom\_service and chatroom\_agent by running:
|
||||
## Configuration
|
||||
|
||||
```
|
||||
make examples
|
||||
```
|
||||
Detailed configuration guide: [docs/CONFIGURATION.md](./docs/CONFIGURATION.md)
|
||||
|
||||
Run the example:
|
||||
|
||||
https://github.com/singchia/frontier/assets/15531166/18b01d96-e30b-450f-9610-917d65259c30
|
||||
|
||||
In this example, you can see features like online/offline notifications and message publishing.
|
||||
|
||||
**Live Streaming**
|
||||
|
||||
In the [examples/rtmp](./examples/rtmp) directory, there is a simple live streaming example implemented in just 80 lines of code. You can get the executable programs `rtmp_service` and `rtmp_edge` by running:
|
||||
|
||||
```
|
||||
make examples
|
||||
```
|
||||
|
||||
After running, use [OBS](https://obsproject.com/) to connect to `rtmp_edge` for live streaming proxy:
|
||||
|
||||
<img src="./docs/diagram/rtmp.png" width="100%">
|
||||
|
||||
In this example, you can see Multiplexer and Stream functionality.
|
||||
|
||||
### How microservice use
|
||||
|
||||
**Getting Service on the Microservice Side**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, err := service.NewService(dialer)
|
||||
// Start using the service
|
||||
}
|
||||
```
|
||||
|
||||
**Receiving ID, Online/Offline Notifications on Microservice Side**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
svc.RegisterGetEdgeID(context.TODO(), getID)
|
||||
svc.RegisterEdgeOnline(context.TODO(), online)
|
||||
svc.RegisterEdgeOffline(context.TODO(), offline)
|
||||
}
|
||||
|
||||
// The service can assign IDs to edges based on metadata
|
||||
func getID(meta []byte) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Edge goes online
|
||||
func online(edgeID uint64, meta []byte, addr net.Addr) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Edge goes offline
|
||||
func offline(edgeID uint64, meta []byte, addr net.Addr) error {
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Publishing Messages to Edge Nodes**:
|
||||
|
||||
The edge must be online beforehand, otherwise the edge cannot be found.
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
msg := svc.NewMessage([]byte("test"))
|
||||
// Publish a message to the edge node with ID 1001
|
||||
err := svc.Publish(context.TODO(), 1001, msg)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Declaring Topic to Receive**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// Declare the topic to receive when getting the service
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceReceiveTopics([]string{"foo"}))
|
||||
for {
|
||||
// Receive messages
|
||||
msg, err := svc.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of the service has ended and it can no longer be used
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// After processing the message, notify the caller it is done
|
||||
msg.Done()
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Calling Edge Node RPC**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
req := svc.NewRequest([]byte("test"))
|
||||
// Call the "foo" method on the edge node with ID 1001. The edge node must have pre-registered this method.
|
||||
rsp, err := svc.Call(context.TODO(), 1001, "foo", req)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Registering Methods for Edge Nodes to Call**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
// Register an "echo" method
|
||||
svc.Register(context.TODO(), "echo", echo)
|
||||
// ...
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Opening Point-to-Point Stream on Edge Node**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
// Open a new stream to the edge node with ID 1001 (st is also a net.Conn). The edge must accept the stream with AcceptStream.
|
||||
st, err := svc.OpenStream(context.TODO(), 1001)
|
||||
}
|
||||
```
|
||||
Based on this newly opened stream, you can transfer files, proxy traffic, etc.
|
||||
|
||||
**Microservice Receives Stream**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// Declare the service name when getting the service, required when the edge opens a stream to specify the service name.
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
|
||||
for {
|
||||
st, err := svc.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of the service has ended and it can no longer be used
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// Use the stream. This stream is also a net.Conn. You can Read/Write/Close, and also use RPC and messaging.
|
||||
}
|
||||
}
|
||||
```
|
||||
Based on this newly opened stream, you can transfer files, proxy traffic, etc.
|
||||
|
||||
**Messages, RPC, and Streams Together!**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// Declare the service name when getting the service, required when the edge opens a stream to specify the service name.
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
|
||||
|
||||
// Receive streams
|
||||
go func() {
|
||||
for {
|
||||
st, err := svc.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of the service has ended and it can no longer be used
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// Use the stream. This stream is also a net.Conn. You can Read/Write/Close, and also use RPC and messaging.
|
||||
}
|
||||
}()
|
||||
|
||||
// Register an "echo" method
|
||||
svc.Register(context.TODO(), "echo", echo)
|
||||
|
||||
// Receive messages
|
||||
for {
|
||||
msg, err := svc.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of the service has ended and it can no longer be used
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// After processing the message, notify the caller it is done
|
||||
msg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
### How edge use
|
||||
|
||||
**Getting Edge on the Edge Node Side**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// Start using eg ...
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Publishes Message to Topic**:
|
||||
|
||||
The service needs to declare receiving the topic in advance, or configure an external MQ in the configuration file.
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// Start using eg
|
||||
msg := eg.NewMessage([]byte("test"))
|
||||
err := eg.Publish(context.TODO(), "foo", msg)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Receives Messages**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
for {
|
||||
// Receive messages
|
||||
msg, err := eg.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of eg has ended and it can no longer be used
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// After processing the message, notify the caller it is done
|
||||
msg.Done()
|
||||
}
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Calls RPC on Microservice**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// Start using eg
|
||||
req := eg.NewRequest([]byte("test"))
|
||||
// Call the "echo" method. Frontier will look up and forward the request to the corresponding microservice.
|
||||
rsp, err := eg.Call(context.TODO(), "echo", req)
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Registers RPC**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// Register an "echo" method
|
||||
eg.Register(context.TODO(), "echo", echo)
|
||||
// ...
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Opens Point-to-Point Stream to Microservice**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
st, err := eg.OpenStream("service-name")
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
Based on this newly opened stream, you can transfer files, proxy traffic, etc.
|
||||
|
||||
**Edge Node Receives Stream**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"fmt"
|
||||
"io"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
for {
|
||||
stream, err := eg.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of eg has ended and it can no longer be used
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// Use the stream. This stream is also a net.Conn. You can Read/Write/Close, and also use RPC and messaging.
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Error Handling
|
||||
|
||||
<table><thead>
|
||||
<tr>
|
||||
<th>Error</th>
|
||||
<th>Description and Handling</th>
|
||||
</tr></thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>io.EOF</td>
|
||||
<td>Receiving EOF indicates that the stream or connection has been closed, and you need to exit operations such as Receive and AcceptStream.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>io.ErrShortBuffer</td>
|
||||
<td>The buffer on the sender or receiver is full. You can adjust the buffer size by setting OptionServiceBufferSize or OptionEdgeBufferSize.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.ErrEdgeNotOnline</td>
|
||||
<td>This indicates that the edge node is not online, and you need to check the edge connection.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.ServiceNotOnline</td>
|
||||
<td>This indicates that the microservice is not online, and you need to check the microservice connection information or connection.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.RPCNotOnline</td>
|
||||
<td>This indicates that the RPC called is not online.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.TopicNotOnline</td>
|
||||
<td>This indicates that the topic to be published is not online.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Other Errors</td>
|
||||
<td>There are also errors like Timeout, BufferFull, etc.</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
It should be noted that if the stream is closed, any blocking methods on the stream will immediately receive io.EOF. If the entry point (Service and Edge) is closed, all streams on it will immediately receive io.EOF for blocking methods.
|
||||
|
||||
### Controlplane
|
||||
|
||||
The Frontier control plane provides gRPC and REST interfaces. Operators can use these APIs to determine the connection status of the current instance. Both gRPC and REST are served on the default port :`30010`.
|
||||
|
||||
**GRPC** See[Protobuf Definition](./api/controlplane/frontier/v1/controlplane.proto)
|
||||
|
||||
```protobuf
|
||||
service ControlPlane {
|
||||
rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse);
|
||||
rpc GetEdge(GetEdgeRequest) returns (Edge);
|
||||
rpc KickEdge(KickEdgeRequest) returns (KickEdgeResponse);
|
||||
rpc ListEdgeRPCs(ListEdgeRPCsRequest) returns (ListEdgeRPCsResponse);
|
||||
rpc ListServices(ListServicesRequest) returns (ListServicesResponse);
|
||||
rpc GetService(GetServiceRequest) returns (Service);
|
||||
rpc KickService(KickServiceRequest) returns (KickServiceResponse);
|
||||
rpc ListServiceRPCs(ListServiceRPCsRequest) returns (ListServiceRPCsResponse);
|
||||
rpc ListServiceTopics(ListServiceTopicsRequest) returns (ListServiceTopicsResponse);
|
||||
}
|
||||
```
|
||||
|
||||
REST Swagger definition can be found at [Swagger Definition](./docs/swagger/swagger.yaml)
|
||||
|
||||
For example, you can use the following request to kick an edge node offline:
|
||||
|
||||
```
|
||||
curl -X DELETE http://127.0.0.1:30010/v1/edges/{edge_id}
|
||||
```
|
||||
|
||||
Or check which RPCs a microservice has registered:
|
||||
|
||||
|
||||
```
|
||||
curl -X GET http://127.0.0.1:30010/v1/services/rpcs?service_id={service_id}
|
||||
```
|
||||
|
||||
Note: gRPC/REST depends on the DAO backend, with two options: ```buntdb``` and ```sqlite3```. Both use in-memory mode. For performance considerations, the default backend uses buntdb, and the count field in the list interface always returns -1. When you configure the backend to ```sqlite3```, it means you have a strong OLTP requirement for connected microservices and edge nodes on Frontier, such as encapsulating the web on Frontier. In this case, the count will return the total number.
|
||||
|
||||
## Frontier Configuration
|
||||
|
||||
If you need to further customize your Frontier instance, you can learn how various configurations work in this section. Customize your configuration, save it as ```frontier.yaml```, and mount it to the container at ```/usr/conf/frontier.yaml``` to take effect.
|
||||
|
||||
### Minimal Configuration
|
||||
|
||||
To get started, you can simply configure the service listening addresses for microservices and edge nodes:
|
||||
|
||||
```yaml
|
||||
# Microservice configuration
|
||||
servicebound:
|
||||
# Listening network
|
||||
listen:
|
||||
network: tcp
|
||||
# Listening address
|
||||
addr: 0.0.0.0:30011
|
||||
# Edge node configuration
|
||||
edgebound:
|
||||
# Listening network
|
||||
listen:
|
||||
network: tcp
|
||||
# Listening address
|
||||
addr: 0.0.0.0:30012
|
||||
# Whether to allow Frontier to allocate edgeID if no ID service is registered
|
||||
edgeid_alloc_when_no_idservice_on: true
|
||||
```
|
||||
|
||||
### TLS
|
||||
|
||||
TLS configuration is supported for microservices, edge nodes, and control planes. mTLS is also supported, where Frontier verifies the client certificate.
|
||||
|
||||
```yaml
|
||||
servicebound:
|
||||
listen:
|
||||
addr: 0.0.0.0:30011
|
||||
network: tcp
|
||||
tls:
|
||||
# Whether to enable TLS, default is disabled
|
||||
enable: false
|
||||
# Certificates and private keys, multiple pairs of certificates are allowed for client negotiation
|
||||
certs:
|
||||
- cert: servicebound.cert
|
||||
key: servicebound.key
|
||||
# Whether to enable mTLS, client certificates will be verified by the following CA
|
||||
mtls: false
|
||||
# CA certificates for verifying client certificates
|
||||
ca_certs:
|
||||
- ca1.cert
|
||||
edgebound:
|
||||
listen:
|
||||
addr: 0.0.0.0:30012
|
||||
network: tcp
|
||||
tls:
|
||||
# Whether to enable TLS, default is disabled
|
||||
enable: false
|
||||
# Certificates and private keys, multiple pairs of certificates are allowed for client negotiation
|
||||
certs:
|
||||
- cert: edgebound.cert
|
||||
key: edgebound.key
|
||||
insecure_skip_verify: false
|
||||
# Whether to enable mTLS, client certificates will be verified by the following CA
|
||||
mtls: false
|
||||
# CA certificates for verifying client certificates
|
||||
ca_certs:
|
||||
- ca1.cert
|
||||
```
|
||||
|
||||
### External MQ
|
||||
|
||||
If you need to configure an external MQ, Frontier supports publishing the corresponding topic to these MQs.
|
||||
|
||||
**AMQP**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
amqp:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# AMQP addresses
|
||||
addrs: null
|
||||
# Producer
|
||||
producer:
|
||||
# Exchange name
|
||||
exchange: ""
|
||||
# Equivalent to Frontier's internal topic concept, array values
|
||||
routing_keys: null
|
||||
```
|
||||
|
||||
For AMQP, the above is the minimal configuration. If the topic of the message published by the edge node is in `routing_keys`, Frontier will publish to the `exchange.` If there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**Kafka**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
kafka:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# Kafka addresses
|
||||
addrs: null
|
||||
# Producer
|
||||
# Array values
|
||||
topics: null
|
||||
```
|
||||
|
||||
For Kafka, the above is the minimal configuration. If the topic of the message published by the edge node is in the above array, Frontier will publish it. If there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**NATS**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
nats:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# NATS addresses
|
||||
addrs: null
|
||||
producer:
|
||||
# Equivalent to Frontier's internal topic concept, array values
|
||||
subjects: null
|
||||
# If Jetstream is allowed, it will be prioritized for publishing
|
||||
jetstream:
|
||||
enable: false
|
||||
# Jetstream name
|
||||
name: ""
|
||||
producer:
|
||||
# Equivalent to Frontier's internal topic concept, array values
|
||||
subjects: null
|
||||
```
|
||||
|
||||
In NATS configuration, if Jetstream is allowed, it will be prioritized for publishing. If there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**NSQ**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
nsq:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# NSQ addresses
|
||||
addrs: null
|
||||
producer:
|
||||
# Array values
|
||||
topics: null
|
||||
```
|
||||
In NSQ's topics, if there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**Redis**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
redis:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# Redis addresses
|
||||
addrs: null
|
||||
# Redis DB
|
||||
db: 0
|
||||
# Password
|
||||
password: ""
|
||||
producer:
|
||||
# Equivalent to Frontier's internal topic concept, array values
|
||||
channels: null
|
||||
```
|
||||
|
||||
If there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**Other Configurations**
|
||||
|
||||
```yaml
|
||||
daemon:
|
||||
# Whether to enable PProf
|
||||
pprof:
|
||||
addr: 0.0.0.0:6060
|
||||
cpu_profile_rate: 0
|
||||
enable: true
|
||||
# Resource limits
|
||||
rlimit:
|
||||
enable: true
|
||||
nofile: 102400
|
||||
# Control plane enable
|
||||
controlplane:
|
||||
enable: false
|
||||
listen:
|
||||
network: tcp
|
||||
addr: 0.0.0.0:30010
|
||||
dao:
|
||||
# Supports buntdb and sqlite3, both use in-memory mode to remain stateless
|
||||
backend: buntdb
|
||||
# SQLite debug enable
|
||||
debug: false
|
||||
exchange:
|
||||
# Frontier forwards edge node messages, RPCs, and open streams to microservices based on hash strategy: edgeid, srcip, or random, default is edgeid.
|
||||
# That is, the same edge node will always request the same microservice.
|
||||
hashby: edgeid
|
||||
```
|
||||
|
||||
For more detailed configurations, see [frontier_all.yaml](./etc/frontier_all.yaml).
|
||||
|
||||
## Frontier Deployment
|
||||
## Deployment
|
||||
|
||||
In a single Frontier instance, you can choose the following methods to deploy your Frontier instance based on your environment.
|
||||
|
||||
@@ -924,73 +198,11 @@ helm install frontier ./ -f values.yaml
|
||||
|
||||
Your microservice should connect to ```service/frontier-servicebound-svc:30011```, and your edge node can connect to the NodePort where `:30012` is located.
|
||||
|
||||
### systemd
|
||||
### Systemd
|
||||
|
||||
If you need to run Frontier as a service on a Linux system, you can deploy it using systemd.
|
||||
Use the dedicated Systemd docs:
|
||||
|
||||
#### Quick Installation
|
||||
|
||||
```bash
|
||||
# Use Makefile to install systemd service (recommended)
|
||||
sudo make install-systemd
|
||||
|
||||
# Enable and start the service
|
||||
sudo systemctl enable frontier
|
||||
sudo systemctl start frontier
|
||||
```
|
||||
|
||||
Or install manually:
|
||||
|
||||
```bash
|
||||
# Build frontier binary
|
||||
make frontier
|
||||
|
||||
# Run installation script with root privileges
|
||||
sudo ./dist/systemd/install.sh
|
||||
|
||||
# Enable and start the service
|
||||
sudo systemctl enable frontier
|
||||
sudo systemctl start frontier
|
||||
```
|
||||
|
||||
#### Service Management
|
||||
|
||||
```bash
|
||||
# Check service status
|
||||
sudo systemctl status frontier
|
||||
|
||||
# View real-time logs
|
||||
sudo journalctl -u frontier -f
|
||||
|
||||
# Restart service
|
||||
sudo systemctl restart frontier
|
||||
|
||||
# Stop service
|
||||
sudo systemctl stop frontier
|
||||
```
|
||||
|
||||
#### Configuration Notes
|
||||
|
||||
- **Service User**: Runs as dedicated `frontier` user for improved security
|
||||
- **Auto Restart**: Automatically restarts when service exits abnormally
|
||||
- **Port Configuration**: Default listens on ports 30011 (microservice) and 30012 (edge node)
|
||||
- **Configuration File**: `/usr/conf/frontier.yaml`
|
||||
- **Log Management**: Outputs to systemd journal
|
||||
|
||||
#### Uninstallation
|
||||
|
||||
```bash
|
||||
# Use Makefile to uninstall systemd service (recommended)
|
||||
sudo make uninstall-systemd
|
||||
```
|
||||
|
||||
Or uninstall manually:
|
||||
|
||||
```bash
|
||||
sudo ./dist/systemd/uninstall.sh
|
||||
```
|
||||
|
||||
For more detailed information, please refer to [dist/systemd/README.md](./dist/systemd/README.md)
|
||||
[dist/systemd/README.md](./dist/systemd/README.md)
|
||||
|
||||
### Operator
|
||||
|
||||
@@ -998,7 +210,7 @@ See the cluster deployment section below.
|
||||
|
||||
## Cluster
|
||||
|
||||
### Frontier + Frontlas
|
||||
### Frontier + Frontlas Architecture
|
||||
|
||||
<img src="./docs/diagram/frontlas.png" width="100%">
|
||||
|
||||
|
||||
+472
@@ -0,0 +1,472 @@
|
||||
<p align=center>
|
||||
<img src="./docs/diagram/frontier-logo.png" width="30%">
|
||||
</p>
|
||||
|
||||
<div align="center">
|
||||
|
||||
[](https://github.com/singchia/frontier/actions/workflows/go.yml)
|
||||
[](https://goreportcard.com/report/github.com/singchia/frontier)
|
||||
[](https://pkg.go.dev/github.com/singchia/frontier/api/dataplane/v1/service)
|
||||
[](https://opensource.org/licenses/Apache-2.0)
|
||||
|
||||
[English](./README.md) | 简体中文
|
||||
|
||||
</div>
|
||||
|
||||
# Frontier
|
||||
|
||||
Frontier是一个go开发的全双工开源长连接网关,旨在让微服务直达边缘节点或客户端,反之边缘节点或客户端也同样直达微服务。对于两者,提供了全双工的单双向RPC调用,消息发布和接收,以及点对点流的功能。Frontier符合云原生架构,可以使用Operator快速部署一个集群,具有高可用和弹性,轻松支撑百万边缘节点或客户端在线的需求。
|
||||
|
||||
## 目录
|
||||
|
||||
- [特性](#特性)
|
||||
- [快速开始](#快速开始)
|
||||
- [架构](#架构)
|
||||
- [使用](#使用)
|
||||
- [配置](#配置)
|
||||
- [部署](#部署)
|
||||
- [集群](#集群)
|
||||
- [Kubernetes](#kubernetes)
|
||||
- [开发](#开发)
|
||||
- [测试](#测试)
|
||||
- [社区](#社区)
|
||||
- [许可证](#许可证)
|
||||
|
||||
## 快速开始
|
||||
|
||||
1. 启动单实例 Frontier:
|
||||
|
||||
```bash
|
||||
docker run -d --name frontier -p 30011:30011 -p 30012:30012 singchia/frontier:1.1.0
|
||||
```
|
||||
|
||||
2. 构建并运行示例:
|
||||
|
||||
```bash
|
||||
make examples
|
||||
```
|
||||
|
||||
运行 chatroom 示例:
|
||||
|
||||
```bash
|
||||
# 终端 1
|
||||
./bin/chatroom_service
|
||||
|
||||
# 终端 2
|
||||
./bin/chatroom_agent
|
||||
```
|
||||
|
||||
演示视频: https://github.com/singchia/frontier/assets/15531166/18b01d96-e30b-450f-9610-917d65259c30
|
||||
|
||||
|
||||
## 特性
|
||||
|
||||
- **RPC** 微服务和边缘可以Call对方的函数(提前注册),并且在微服务侧支持负载均衡
|
||||
- **消息** 微服务和边缘可以Publish对方的Topic,边缘可以Publish到外部MQ的Topic,微服务侧支持负载均衡
|
||||
- **多路复用/流** 微服务可以直接在边缘节点打开一个流(连接),可以封装例如文件上传、代理等,天堑变通途
|
||||
- **上线离线控制** 微服务可以注册边缘节点获取ID、上线离线回调,当这些事件发生,Frontier会调用这些函数
|
||||
- **API简单** 在项目api目录下,分别对边缘和微服务提供了封装好的sdk,可以非常简单的基于这个sdk做开发
|
||||
- **部署简单** 支持多种部署方式(docker docker-compose helm以及operator)来部署Frontier实例或集群
|
||||
- **水平扩展** 提供了Frontiter和Frontlas集群,在单实例性能达到瓶颈下,可以水平扩展Frontier实例或集群
|
||||
- **高可用** 支持集群部署,支持微服务和边缘节点永久重连sdk,在当前实例宕机情况时,切换新可用实例继续服务
|
||||
- **支持控制面** 提供了gRPC和rest接口,允许运维人员对微服务和边缘节点查询或删除,删除即踢除目标下线
|
||||
|
||||
## 架构
|
||||
|
||||
### 组件Frontier
|
||||
|
||||
<img src="./docs/diagram/frontier.png" width="100%">
|
||||
|
||||
|
||||
- _Service End_:微服务侧的功能入口,默认连接
|
||||
- _Edge End_:边缘节点或客户端侧的功能入口
|
||||
- _Publish/Receive_:发布和接收消息
|
||||
- _Call/Register_:调用和注册函数
|
||||
- _OpenStream/AcceptStream_:打开和接收点到点流(连接)
|
||||
- _外部MQ_:Frontier支持将从边缘节点Publish的消息根据配置的Topic转发到外部MQ
|
||||
|
||||
Frontier需要微服务和边缘节点两方都主动连接到Frontier,Service和Edge的元信息(接收Topic,RPC,Service名等)可以在连接的时候携带过来。连接的默认端口是:
|
||||
|
||||
- ```:30011``` 提供给微服务连接,获取Service
|
||||
- ```:30012``` 提供给边缘节点连接,获取Edge
|
||||
- ```:30010``` 提供给运维人员或者程序使用的控制面
|
||||
|
||||
|
||||
### 功能
|
||||
|
||||
<table><thead>
|
||||
<tr>
|
||||
<th>功能</th>
|
||||
<th>发起方</th>
|
||||
<th>接收方</th>
|
||||
<th>方法</th>
|
||||
<th>路由方式</th>
|
||||
<th>描述</th>
|
||||
</tr></thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td rowspan="2">Messager</td>
|
||||
<td>Service</td>
|
||||
<td>Edge</td>
|
||||
<td>Publish</td>
|
||||
<td>EdgeID+Topic</td>
|
||||
<td>必须Publish到具体的EdgeID,默认Topic为空,Edge调用Receive接收,接收处理完成后必须调用msg.Done()或msg.Error(err)保障消息一致性</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Edge</td>
|
||||
<td>Service或外部MQ</td>
|
||||
<td>Publish</td>
|
||||
<td>Topic</td>
|
||||
<td>必须Publish到Topic,由Frontier根据Topic选择某个Service或MQ</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td rowspan="2">RPCer</td>
|
||||
<td>Service</td>
|
||||
<td>Edge</td>
|
||||
<td>Call</td>
|
||||
<td>EdgeID+Method</td>
|
||||
<td>必须Call到具体的EdgeID,需要携带Method</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Edge</td>
|
||||
<td>Service</td>
|
||||
<td>Call</td>
|
||||
<td>Method</td>
|
||||
<td>必须Call到Method,由Frontier根据Method选择某个的Service</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td rowspan="2">Multiplexer</td>
|
||||
<td>Service</td>
|
||||
<td>Edge</td>
|
||||
<td>OpenStream</td>
|
||||
<td>EdgeID</td>
|
||||
<td>必须OpenStream到具体的EdgeID</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Edge</td>
|
||||
<td>Service</td>
|
||||
<td>OpenStream</td>
|
||||
<td>ServiceName</td>
|
||||
<td>必须OpenStream到ServiceName,该ServiceName由Service初始化时携带的service.OptionServiceName指定</td>
|
||||
</tr>
|
||||
</tbody></table>
|
||||
|
||||
主要遵守以下设计原则:
|
||||
|
||||
1. 所有的消息、RPC和Stream都是点到点的传递
|
||||
- 从微服务到边缘,一定要指定边缘节点ID
|
||||
- 从边缘到微服务,Frontier根据Topic和Method路由,最终哈希选择一个微服务或外部MQ,默认根据```edgeid```哈希,你也可以选择```random```或```srcip```
|
||||
2. 消息需要接收方明确结束
|
||||
- 为了保障消息的传达语义,接收方一定需要msg.Done()或msg.Error(err),保障传达一致性
|
||||
3. Multiplexer打开的流在逻辑上是微服务与边缘节点的直接通信
|
||||
- 对方接收到流后,所有在这个流上功能都会直达对方,不会经过Frontierd的路由策略
|
||||
|
||||
|
||||
## 使用
|
||||
|
||||
详细使用文档: [docs/USAGE_zh.md](./docs/USAGE_zh.md)
|
||||
|
||||
## 配置
|
||||
|
||||
详细配置文档: [docs/CONFIGURATION_zh.md](./docs/CONFIGURATION_zh.md)
|
||||
|
||||
## 部署
|
||||
|
||||
在单Frontier实例下,可以根据环境选择以下方式部署你的Frontier实例。
|
||||
|
||||
### docker
|
||||
|
||||
```
|
||||
docker run -d --name frontier -p 30011:30011 -p 30012:30012 singchia/frontier:1.1.0
|
||||
```
|
||||
|
||||
|
||||
### docker-compose
|
||||
|
||||
```
|
||||
git clone https://github.com/singchia/frontier.git
|
||||
cd dist/compose
|
||||
docker-compose up -d frontier
|
||||
```
|
||||
|
||||
### helm
|
||||
|
||||
如果你是在k8s环境下,可以使用helm快速部署一个实例
|
||||
|
||||
```
|
||||
git clone https://github.com/singchia/frontier.git
|
||||
cd dist/helm
|
||||
helm install frontier ./ -f values.yaml
|
||||
```
|
||||
|
||||
你的微服务应该连接`service/frontier-servicebound-svc:30011`,你的边缘节点可以连接`:30012`所在的NodePort。
|
||||
|
||||
### Systemd
|
||||
|
||||
使用独立的 Systemd 文档:
|
||||
|
||||
[dist/systemd/README_cn.md](./dist/systemd/README_cn.md)
|
||||
|
||||
### operator
|
||||
|
||||
见下面集群部署章节
|
||||
|
||||
## 集群
|
||||
|
||||
### Frontier + Frontlas架构
|
||||
|
||||
<img src="./docs/diagram/frontlas.png" width="100%">
|
||||
|
||||
新增Frontlas组件用于构建集群,Frontlas同样也是无状态组件,并不在内存里留存其他信息,因此需要额外依赖Redis,你需要提供一个Redis连接信息给到Frontlas,支持 ```redis``` ```sentinel```和```redis-cluster```。
|
||||
|
||||
- _Frontier_:微服务和边缘数据面通信组件
|
||||
- _Frontlas_:命名取自Frontier Atlas,集群管理组件,将微服务和边缘的元信息、活跃信息记录在Redis里
|
||||
|
||||
Frontier需要主动连接Frontlas以上报自己、微服务和边缘的活跃和状态,默认Frontlas的端口是:
|
||||
|
||||
- ```:40011``` 提供给微服务连接,代替微服务在单Frontier实例下连接的30011端口
|
||||
- ```:40012``` 提供给Frontier连接,上报状态
|
||||
|
||||
你可以根据需要部署任意多个Frontier实例,而对于Frontlas,分开部署两个即可保障HA(高可用),因为不存储状态没有一致性问题。
|
||||
|
||||
### 配置
|
||||
|
||||
**Frontier**的frontier.yaml需要添加如下配置:
|
||||
|
||||
```yaml
|
||||
frontlas:
|
||||
enable: true
|
||||
dial:
|
||||
network: tcp
|
||||
addr:
|
||||
- 127.0.0.1:40012
|
||||
tls:
|
||||
metrics:
|
||||
enable: false
|
||||
interval: 0
|
||||
daemon:
|
||||
# Frontier集群内的唯一ID
|
||||
frontier_id: frontier01
|
||||
```
|
||||
Frontier需要连接Frontlas,用来上报自己、微服务和边缘的活跃和状态。
|
||||
|
||||
|
||||
**Frontlas**的frontlas.yaml最小化配置:
|
||||
|
||||
```yaml
|
||||
control_plane:
|
||||
listen:
|
||||
# 微服务改连接这个地址,用来发现集群的边缘节点所在的Frontier
|
||||
network: tcp
|
||||
addr: 0.0.0.0:40011
|
||||
frontier_plane:
|
||||
# Frontier连接这个地址
|
||||
listen:
|
||||
network: tcp
|
||||
addr: 0.0.0.0:40012
|
||||
expiration:
|
||||
# 微服务在redis内元信息的过期时间
|
||||
service_meta: 30
|
||||
# 边缘节点在redis内元信息的过期时间
|
||||
edge_meta: 30
|
||||
redis:
|
||||
# 支持连接standalone、sentinel和cluster
|
||||
mode: standalone
|
||||
standalone:
|
||||
network: tcp
|
||||
addr: redis:6379
|
||||
db: 0
|
||||
```
|
||||
|
||||
更多详细配置见 [frontlas_all.yaml](./etc/frontlas_all.yaml)
|
||||
|
||||
### 使用
|
||||
|
||||
由于使用Frontlas来发现可用的Frontier,因此微服务需要做出调整如下:
|
||||
|
||||
**微服务获取Service**
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 改使用NewClusterService来获取Service
|
||||
svc, err := service.NewClusterService("127.0.0.1:40011")
|
||||
// 开始使用service,其他一切保持不变
|
||||
}
|
||||
```
|
||||
|
||||
**边缘节点获取连接地址**
|
||||
|
||||
对于边缘节点来说,依然连接Frontier,不过可以从Frontlas来获取可用的Frontier地址,Frontlas提供了列举Frontier实例接口:
|
||||
|
||||
```
|
||||
curl -X http://127.0.0.1:40011/cluster/v1/frontiers
|
||||
```
|
||||
你可以在这个接口上封装一下,提供给边缘节点做负载均衡或者高可用,或加上mTLS直接提供给边缘节点(不建议)。
|
||||
|
||||
**控制面GRPC** 详见[Protobuf定义](./api/controlplane/frontlas/v1/cluster.proto)
|
||||
|
||||
Frontlas控制面与Frontier不同,是面向集群的控制面,目前只提供了读取集群的接口
|
||||
|
||||
```protobuf
|
||||
service ClusterService {
|
||||
rpc GetFrontierByEdge(GetFrontierByEdgeIDRequest) returns (GetFrontierByEdgeIDResponse);
|
||||
rpc ListFrontiers(ListFrontiersRequest) returns (ListFrontiersResponse);
|
||||
|
||||
rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse);
|
||||
rpc GetEdgeByID(GetEdgeByIDRequest) returns (GetEdgeByIDResponse);
|
||||
rpc GetEdgesCount(GetEdgesCountRequest) returns (GetEdgesCountResponse);
|
||||
|
||||
rpc ListServices(ListServicesRequest) returns (ListServicesResponse) ;
|
||||
rpc GetServiceByID(GetServiceByIDRequest) returns (GetServiceByIDResponse) ;
|
||||
rpc GetServicesCount(GetServicesCountRequest) returns (GetServicesCountResponse) ;
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Kubernetes
|
||||
|
||||
### Operator
|
||||
|
||||
**安装CRD和Operator**
|
||||
|
||||
按照以下步骤安装和部署Operator到你的.kubeconfig环境中:
|
||||
|
||||
```
|
||||
git clone https://github.com/singchia/frontier.git
|
||||
cd dist/crd
|
||||
kubectl apply -f install.yaml
|
||||
```
|
||||
|
||||
查看CRD:
|
||||
|
||||
```
|
||||
kubectl get crd frontierclusters.frontier.singchia.io
|
||||
```
|
||||
|
||||
查看Operator:
|
||||
|
||||
```
|
||||
kubectl get all -n frontier-system
|
||||
```
|
||||
|
||||
**FrontierCluster集群**
|
||||
|
||||
```yaml
|
||||
apiVersion: frontier.singchia.io/v1alpha1
|
||||
kind: FrontierCluster
|
||||
metadata:
|
||||
labels:
|
||||
app.kubernetes.io/name: frontiercluster
|
||||
app.kubernetes.io/managed-by: kustomize
|
||||
name: frontiercluster
|
||||
spec:
|
||||
frontier:
|
||||
# 单实例Frontier
|
||||
replicas: 2
|
||||
# 微服务侧端口
|
||||
servicebound:
|
||||
port: 30011
|
||||
# 边缘节点侧端口
|
||||
edgebound:
|
||||
port: 30012
|
||||
frontlas:
|
||||
# 单实例Frontlas
|
||||
replicas: 1
|
||||
# 控制面端口
|
||||
controlplane:
|
||||
port: 40011
|
||||
redis:
|
||||
# 依赖的Redis配置
|
||||
addrs:
|
||||
- rfs-redisfailover:26379
|
||||
password: your-password
|
||||
masterName: mymaster
|
||||
redisType: sentinel
|
||||
```
|
||||
|
||||
保存为`frontiercluster.yaml`,执行
|
||||
|
||||
```
|
||||
kubectl apply -f frontiercluster.yaml
|
||||
```
|
||||
|
||||
1分钟,你即可拥有一个2实例Frontier+1实例Frontlas的集群。
|
||||
|
||||
通过一下来检查资源部署情况
|
||||
|
||||
> kubectl get all -l app=frontiercluster-frontier
|
||||
> kubectl get all -l app=frontiercluster-frontlas
|
||||
|
||||
|
||||
```
|
||||
NAME READY STATUS RESTARTS AGE
|
||||
pod/frontiercluster-frontier-57d565c89-dn6n8 1/1 Running 0 7m22s
|
||||
pod/frontiercluster-frontier-57d565c89-nmwmt 1/1 Running 0 7m22s
|
||||
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
|
||||
service/frontiercluster-edgebound-svc NodePort 10.233.23.174 <none> 30012:30012/TCP 8m7s
|
||||
service/frontiercluster-servicebound-svc ClusterIP 10.233.29.156 <none> 30011/TCP 8m7s
|
||||
NAME READY UP-TO-DATE AVAILABLE AGE
|
||||
deployment.apps/frontiercluster-frontier 2/2 2 2 7m22s
|
||||
NAME DESIRED CURRENT READY AGE
|
||||
replicaset.apps/frontiercluster-frontier-57d565c89 2 2 2 7m22s
|
||||
```
|
||||
|
||||
```
|
||||
NAME READY STATUS RESTARTS AGE
|
||||
pod/frontiercluster-frontlas-85c4fb6d9b-5clkh 1/1 Running 0 8m11s
|
||||
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
|
||||
service/frontiercluster-frontlas-svc ClusterIP 10.233.0.23 <none> 40011/TCP,40012/TCP 8m11s
|
||||
NAME READY UP-TO-DATE AVAILABLE AGE
|
||||
deployment.apps/frontiercluster-frontlas 1/1 1 1 8m11s
|
||||
NAME DESIRED CURRENT READY AGE
|
||||
replicaset.apps/frontiercluster-frontlas-85c4fb6d9b 1 1 1 8m11s
|
||||
```
|
||||
|
||||
你的微服务应该连接`service/frontiercluster-frontlas-svc:40011`,你的边缘节点可以连接`:30012`所在的NodePort。
|
||||
|
||||
## 开发
|
||||
|
||||
### 路线图
|
||||
|
||||
详见 [ROADMAP](./ROADMAP.md)
|
||||
|
||||
### 贡献
|
||||
|
||||
如果你发现任何Bug,请提出Issue,项目Maintainers会及时响应相关问题。
|
||||
|
||||
如果你希望能够提交Feature,更快速解决项目问题,满足以下简单条件下欢迎提交PR:
|
||||
|
||||
* 代码风格保持一致
|
||||
* 每次提交一个Feature
|
||||
* 提交的代码都携带单元测试
|
||||
|
||||
|
||||
## 测试
|
||||
|
||||
### 流功能测试
|
||||
|
||||
<img src="./docs/diagram/stream.png" width="100%">
|
||||
|
||||
|
||||
## 社区
|
||||
|
||||
<p align=center>
|
||||
<img src="./docs/diagram/wechat.JPG" width="30%">
|
||||
</p>
|
||||
|
||||
添加以加入微信群组
|
||||
|
||||
## 许可证
|
||||
|
||||
Released under the [Apache License 2.0](https://github.com/singchia/geminio/blob/main/LICENSE)
|
||||
|
||||
---
|
||||
|
||||
已经看到这里,点个Star⭐️吧♥️
|
||||
Vendored
+82
-81
@@ -1,122 +1,123 @@
|
||||
# Frontier Systemd Service
|
||||
|
||||
这个目录包含了将Frontier作为systemd服务运行的配置文件。
|
||||
This directory contains files for running Frontier as a systemd service.
|
||||
|
||||
## 文件说明
|
||||
## Files
|
||||
|
||||
- `frontier.service` - systemd服务配置文件
|
||||
- `install.sh` - 自动安装脚本
|
||||
- `uninstall.sh` - 自动卸载脚本
|
||||
- `README.md` - 本说明文件
|
||||
- `frontier.service` - systemd service unit file
|
||||
- `install.sh` - automated installation script
|
||||
- `uninstall.sh` - automated uninstallation script
|
||||
- `README.md` - English documentation
|
||||
- `README_cn.md` - Chinese documentation
|
||||
|
||||
## 快速安装
|
||||
## Quick Install
|
||||
|
||||
### 方法1: 使用Makefile(推荐)
|
||||
### Method 1: Makefile (Recommended)
|
||||
|
||||
```bash
|
||||
# 使用Makefile安装systemd服务
|
||||
# Install systemd service via Makefile
|
||||
sudo make install-systemd
|
||||
```
|
||||
|
||||
### 方法2: 使用安装脚本
|
||||
### Method 2: Install Script
|
||||
|
||||
```bash
|
||||
# 构建frontier二进制文件
|
||||
# Build frontier binary
|
||||
make frontier
|
||||
|
||||
# 以root权限运行安装脚本
|
||||
# Run install script as root
|
||||
sudo ./dist/systemd/install.sh
|
||||
```
|
||||
|
||||
### 方法3: 手动安装
|
||||
### Method 3: Manual Install
|
||||
|
||||
1. 构建并安装frontier:
|
||||
1. Build and install frontier:
|
||||
```bash
|
||||
make install-frontier
|
||||
```
|
||||
|
||||
2. 创建frontier用户:
|
||||
2. Create user:
|
||||
```bash
|
||||
sudo useradd --system --no-create-home --shell /bin/false frontier
|
||||
```
|
||||
|
||||
3. 创建必要的目录:
|
||||
3. Create required directories:
|
||||
```bash
|
||||
sudo mkdir -p /var/log/frontier /var/lib/frontier
|
||||
sudo chown -R frontier:frontier /var/log/frontier /var/lib/frontier
|
||||
```
|
||||
|
||||
4. 安装systemd服务:
|
||||
4. Install service:
|
||||
```bash
|
||||
sudo cp dist/systemd/frontier.service /etc/systemd/system/
|
||||
sudo systemctl daemon-reload
|
||||
```
|
||||
|
||||
## 服务管理
|
||||
## Service Management
|
||||
|
||||
### 启用和启动服务
|
||||
### Enable and Start
|
||||
|
||||
```bash
|
||||
# 启用服务(开机自启)
|
||||
# Enable on boot
|
||||
sudo systemctl enable frontier
|
||||
|
||||
# 启动服务
|
||||
# Start service
|
||||
sudo systemctl start frontier
|
||||
|
||||
# 检查服务状态
|
||||
# Check status
|
||||
sudo systemctl status frontier
|
||||
```
|
||||
|
||||
### 服务控制
|
||||
### Control
|
||||
|
||||
```bash
|
||||
# 启动服务
|
||||
# Start
|
||||
sudo systemctl start frontier
|
||||
|
||||
# 停止服务
|
||||
# Stop
|
||||
sudo systemctl stop frontier
|
||||
|
||||
# 重启服务
|
||||
# Restart
|
||||
sudo systemctl restart frontier
|
||||
|
||||
# 重新加载配置
|
||||
# Reload config
|
||||
sudo systemctl reload frontier
|
||||
```
|
||||
|
||||
### 查看日志
|
||||
### Logs
|
||||
|
||||
```bash
|
||||
# 查看实时日志
|
||||
# Follow logs
|
||||
sudo journalctl -u frontier -f
|
||||
|
||||
# 查看最近的日志
|
||||
# Last 100 lines
|
||||
sudo journalctl -u frontier -n 100
|
||||
|
||||
# 查看特定时间段的日志
|
||||
# Time range
|
||||
sudo journalctl -u frontier --since "2024-01-01" --until "2024-01-02"
|
||||
```
|
||||
|
||||
## 配置说明
|
||||
## Configuration Notes
|
||||
|
||||
### 服务配置特性
|
||||
### Service Features
|
||||
|
||||
- **用户隔离**: 以`frontier`用户运行,提高安全性
|
||||
- **自动重启**: 服务异常退出时自动重启
|
||||
- **资源限制**: 设置了文件描述符和进程数限制
|
||||
- **安全设置**: 启用了多种安全保护措施
|
||||
- **日志管理**: 输出到systemd journal
|
||||
- **User isolation**: runs as dedicated `frontier` user
|
||||
- **Auto restart**: restarts automatically on unexpected exits
|
||||
- **Resource limits**: includes file descriptor and process limits
|
||||
- **Hardening**: includes common systemd security settings
|
||||
- **Logging**: logs to systemd journal
|
||||
|
||||
### 端口配置
|
||||
### Ports
|
||||
|
||||
Frontier默认监听以下端口:
|
||||
- `30011` - Service bound端口
|
||||
- `30012` - Edge bound端口
|
||||
Default listen ports:
|
||||
- `30011` - Service bound
|
||||
- `30012` - Edge bound
|
||||
|
||||
确保防火墙允许这些端口的访问。
|
||||
Make sure these ports are allowed by firewall rules.
|
||||
|
||||
### 配置文件
|
||||
### Config File
|
||||
|
||||
服务使用`/usr/conf/frontier.yaml`作为配置文件。你可以根据需要修改配置:
|
||||
The service uses `/usr/conf/frontier.yaml` by default:
|
||||
|
||||
```yaml
|
||||
edgebound:
|
||||
@@ -130,86 +131,86 @@ servicebound:
|
||||
addr: 0.0.0.0:30011
|
||||
```
|
||||
|
||||
## 卸载
|
||||
## Uninstall
|
||||
|
||||
### 使用Makefile(推荐)
|
||||
### Makefile (Recommended)
|
||||
|
||||
```bash
|
||||
sudo make uninstall-systemd
|
||||
```
|
||||
|
||||
### 使用卸载脚本
|
||||
### Script
|
||||
|
||||
```bash
|
||||
sudo ./dist/systemd/uninstall.sh
|
||||
```
|
||||
|
||||
### 手动卸载
|
||||
### Manual
|
||||
|
||||
```bash
|
||||
# 停止并禁用服务
|
||||
# Stop and disable service
|
||||
sudo systemctl stop frontier
|
||||
sudo systemctl disable frontier
|
||||
|
||||
# 删除服务文件
|
||||
# Remove service file
|
||||
sudo rm /etc/systemd/system/frontier.service
|
||||
sudo systemctl daemon-reload
|
||||
|
||||
# 删除用户(可选)
|
||||
# Remove user (optional)
|
||||
sudo userdel frontier
|
||||
|
||||
# 删除目录(可选)
|
||||
# Remove directories (optional)
|
||||
sudo rm -rf /var/log/frontier /var/lib/frontier
|
||||
```
|
||||
|
||||
## 故障排除
|
||||
## Troubleshooting
|
||||
|
||||
### 常见问题
|
||||
### Common Issues
|
||||
|
||||
1. **服务启动失败**
|
||||
- 检查二进制文件是否存在:`ls -la /usr/bin/frontier`
|
||||
- 检查配置文件是否存在:`ls -la /usr/conf/frontier.yaml`
|
||||
- 查看详细错误:`sudo journalctl -u frontier -n 50`
|
||||
1. **Service failed to start**
|
||||
- Check binary: `ls -la /usr/bin/frontier`
|
||||
- Check config: `ls -la /usr/conf/frontier.yaml`
|
||||
- Check logs: `sudo journalctl -u frontier -n 50`
|
||||
|
||||
2. **端口被占用**
|
||||
- 检查端口使用情况:`sudo netstat -tlnp | grep -E ':(30011|30012)'`
|
||||
- 修改配置文件中的端口设置
|
||||
2. **Port conflicts**
|
||||
- Check ports: `sudo netstat -tlnp | grep -E ':(30011|30012)'`
|
||||
- Update listen ports in config
|
||||
|
||||
3. **权限问题**
|
||||
- 确保frontier用户存在:`id frontier`
|
||||
- 检查目录权限:`ls -la /var/log/frontier /var/lib/frontier`
|
||||
3. **Permission issues**
|
||||
- Verify user: `id frontier`
|
||||
- Check dir permissions: `ls -la /var/log/frontier /var/lib/frontier`
|
||||
|
||||
### 调试模式
|
||||
|
||||
如果需要调试,可以临时修改服务配置:
|
||||
### Debug Mode
|
||||
|
||||
```bash
|
||||
sudo systemctl edit frontier
|
||||
```
|
||||
|
||||
添加以下内容:
|
||||
Add:
|
||||
|
||||
```ini
|
||||
[Service]
|
||||
ExecStart=
|
||||
ExecStart=/usr/bin/frontier --config /usr/conf/frontier.yaml -v 2
|
||||
```
|
||||
|
||||
然后重启服务:
|
||||
Then restart:
|
||||
|
||||
```bash
|
||||
sudo systemctl restart frontier
|
||||
```
|
||||
|
||||
## 安全注意事项
|
||||
## Security Notes
|
||||
|
||||
1. 服务以非特权用户运行
|
||||
2. 启用了多种systemd安全特性
|
||||
3. 建议定期更新frontier版本
|
||||
4. 监控服务日志以发现异常活动
|
||||
5. 确保配置文件权限正确(644)
|
||||
1. Run service as non-privileged user
|
||||
2. Keep systemd hardening enabled
|
||||
3. Update Frontier regularly
|
||||
4. Monitor journal logs
|
||||
5. Keep config file permissions strict
|
||||
|
||||
## 支持
|
||||
## Support
|
||||
|
||||
如果遇到问题,请:
|
||||
1. 查看systemd日志:`journalctl -u frontier`
|
||||
2. 检查frontier项目文档
|
||||
3. 提交issue到项目仓库
|
||||
If you run into issues:
|
||||
1. Check systemd logs: `journalctl -u frontier`
|
||||
2. Check Frontier docs
|
||||
3. Open an issue in the repository
|
||||
|
||||
Vendored
+216
@@ -0,0 +1,216 @@
|
||||
# Frontier Systemd Service
|
||||
|
||||
这个目录包含了将Frontier作为systemd服务运行的配置文件。
|
||||
|
||||
## 文件说明
|
||||
|
||||
- `frontier.service` - systemd服务配置文件
|
||||
- `install.sh` - 自动安装脚本
|
||||
- `uninstall.sh` - 自动卸载脚本
|
||||
- `README.md` - 英文说明文档
|
||||
- `README_cn.md` - 中文说明文档
|
||||
|
||||
## 快速安装
|
||||
|
||||
### 方法1: 使用Makefile(推荐)
|
||||
|
||||
```bash
|
||||
# 使用Makefile安装systemd服务
|
||||
sudo make install-systemd
|
||||
```
|
||||
|
||||
### 方法2: 使用安装脚本
|
||||
|
||||
```bash
|
||||
# 构建frontier二进制文件
|
||||
make frontier
|
||||
|
||||
# 以root权限运行安装脚本
|
||||
sudo ./dist/systemd/install.sh
|
||||
```
|
||||
|
||||
### 方法3: 手动安装
|
||||
|
||||
1. 构建并安装frontier:
|
||||
```bash
|
||||
make install-frontier
|
||||
```
|
||||
|
||||
2. 创建frontier用户:
|
||||
```bash
|
||||
sudo useradd --system --no-create-home --shell /bin/false frontier
|
||||
```
|
||||
|
||||
3. 创建必要的目录:
|
||||
```bash
|
||||
sudo mkdir -p /var/log/frontier /var/lib/frontier
|
||||
sudo chown -R frontier:frontier /var/log/frontier /var/lib/frontier
|
||||
```
|
||||
|
||||
4. 安装systemd服务:
|
||||
```bash
|
||||
sudo cp dist/systemd/frontier.service /etc/systemd/system/
|
||||
sudo systemctl daemon-reload
|
||||
```
|
||||
|
||||
## 服务管理
|
||||
|
||||
### 启用和启动服务
|
||||
|
||||
```bash
|
||||
# 启用服务(开机自启)
|
||||
sudo systemctl enable frontier
|
||||
|
||||
# 启动服务
|
||||
sudo systemctl start frontier
|
||||
|
||||
# 检查服务状态
|
||||
sudo systemctl status frontier
|
||||
```
|
||||
|
||||
### 服务控制
|
||||
|
||||
```bash
|
||||
# 启动服务
|
||||
sudo systemctl start frontier
|
||||
|
||||
# 停止服务
|
||||
sudo systemctl stop frontier
|
||||
|
||||
# 重启服务
|
||||
sudo systemctl restart frontier
|
||||
|
||||
# 重新加载配置
|
||||
sudo systemctl reload frontier
|
||||
```
|
||||
|
||||
### 查看日志
|
||||
|
||||
```bash
|
||||
# 查看实时日志
|
||||
sudo journalctl -u frontier -f
|
||||
|
||||
# 查看最近的日志
|
||||
sudo journalctl -u frontier -n 100
|
||||
|
||||
# 查看特定时间段的日志
|
||||
sudo journalctl -u frontier --since "2024-01-01" --until "2024-01-02"
|
||||
```
|
||||
|
||||
## 配置说明
|
||||
|
||||
### 服务配置特性
|
||||
|
||||
- **用户隔离**: 以`frontier`用户运行,提高安全性
|
||||
- **自动重启**: 服务异常退出时自动重启
|
||||
- **资源限制**: 设置了文件描述符和进程数限制
|
||||
- **安全设置**: 启用了多种安全保护措施
|
||||
- **日志管理**: 输出到systemd journal
|
||||
|
||||
### 端口配置
|
||||
|
||||
Frontier默认监听以下端口:
|
||||
- `30011` - Service bound端口
|
||||
- `30012` - Edge bound端口
|
||||
|
||||
确保防火墙允许这些端口的访问。
|
||||
|
||||
### 配置文件
|
||||
|
||||
服务使用`/usr/conf/frontier.yaml`作为配置文件。你可以根据需要修改配置:
|
||||
|
||||
```yaml
|
||||
edgebound:
|
||||
listen:
|
||||
network: tcp
|
||||
addr: 0.0.0.0:30012
|
||||
edgeid_alloc_when_no_idservice_on: true
|
||||
servicebound:
|
||||
listen:
|
||||
network: tcp
|
||||
addr: 0.0.0.0:30011
|
||||
```
|
||||
|
||||
## 卸载
|
||||
|
||||
### 使用Makefile(推荐)
|
||||
|
||||
```bash
|
||||
sudo make uninstall-systemd
|
||||
```
|
||||
|
||||
### 使用卸载脚本
|
||||
|
||||
```bash
|
||||
sudo ./dist/systemd/uninstall.sh
|
||||
```
|
||||
|
||||
### 手动卸载
|
||||
|
||||
```bash
|
||||
# 停止并禁用服务
|
||||
sudo systemctl stop frontier
|
||||
sudo systemctl disable frontier
|
||||
|
||||
# 删除服务文件
|
||||
sudo rm /etc/systemd/system/frontier.service
|
||||
sudo systemctl daemon-reload
|
||||
|
||||
# 删除用户(可选)
|
||||
sudo userdel frontier
|
||||
|
||||
# 删除目录(可选)
|
||||
sudo rm -rf /var/log/frontier /var/lib/frontier
|
||||
```
|
||||
|
||||
## 故障排除
|
||||
|
||||
### 常见问题
|
||||
|
||||
1. **服务启动失败**
|
||||
- 检查二进制文件是否存在:`ls -la /usr/bin/frontier`
|
||||
- 检查配置文件是否存在:`ls -la /usr/conf/frontier.yaml`
|
||||
- 查看详细错误:`sudo journalctl -u frontier -n 50`
|
||||
|
||||
2. **端口被占用**
|
||||
- 检查端口使用情况:`sudo netstat -tlnp | grep -E ':(30011|30012)'`
|
||||
- 修改配置文件中的端口设置
|
||||
|
||||
3. **权限问题**
|
||||
- 确保frontier用户存在:`id frontier`
|
||||
- 检查目录权限:`ls -la /var/log/frontier /var/lib/frontier`
|
||||
|
||||
### 调试模式
|
||||
|
||||
如果需要调试,可以临时修改服务配置:
|
||||
|
||||
```bash
|
||||
sudo systemctl edit frontier
|
||||
```
|
||||
|
||||
添加以下内容:
|
||||
```ini
|
||||
[Service]
|
||||
ExecStart=
|
||||
ExecStart=/usr/bin/frontier --config /usr/conf/frontier.yaml -v 2
|
||||
```
|
||||
|
||||
然后重启服务:
|
||||
```bash
|
||||
sudo systemctl restart frontier
|
||||
```
|
||||
|
||||
## 安全注意事项
|
||||
|
||||
1. 服务以非特权用户运行
|
||||
2. 启用了多种systemd安全特性
|
||||
3. 建议定期更新frontier版本
|
||||
4. 监控服务日志以发现异常活动
|
||||
5. 确保配置文件权限正确(644)
|
||||
|
||||
## 支持
|
||||
|
||||
如果遇到问题,请:
|
||||
1. 查看systemd日志:`journalctl -u frontier`
|
||||
2. 检查frontier项目文档
|
||||
3. 提交issue到项目仓库
|
||||
@@ -0,0 +1,197 @@
|
||||
## Configuration
|
||||
|
||||
If you need to further customize your Frontier instance, you can learn how various configurations work in this section. Customize your configuration, save it as ```frontier.yaml```, and mount it to the container at ```/usr/conf/frontier.yaml``` to take effect.
|
||||
|
||||
### Minimal Configuration
|
||||
|
||||
To get started, you can simply configure the service listening addresses for microservices and edge nodes:
|
||||
|
||||
```yaml
|
||||
# Microservice configuration
|
||||
servicebound:
|
||||
# Listening network
|
||||
listen:
|
||||
network: tcp
|
||||
# Listening address
|
||||
addr: 0.0.0.0:30011
|
||||
# Edge node configuration
|
||||
edgebound:
|
||||
# Listening network
|
||||
listen:
|
||||
network: tcp
|
||||
# Listening address
|
||||
addr: 0.0.0.0:30012
|
||||
# Whether to allow Frontier to allocate edgeID if no ID service is registered
|
||||
edgeid_alloc_when_no_idservice_on: true
|
||||
```
|
||||
|
||||
### TLS
|
||||
|
||||
TLS configuration is supported for microservices, edge nodes, and control planes. mTLS is also supported, where Frontier verifies the client certificate.
|
||||
|
||||
```yaml
|
||||
servicebound:
|
||||
listen:
|
||||
addr: 0.0.0.0:30011
|
||||
network: tcp
|
||||
tls:
|
||||
# Whether to enable TLS, default is disabled
|
||||
enable: false
|
||||
# Certificates and private keys, multiple pairs of certificates are allowed for client negotiation
|
||||
certs:
|
||||
- cert: servicebound.cert
|
||||
key: servicebound.key
|
||||
# Whether to enable mTLS, client certificates will be verified by the following CA
|
||||
mtls: false
|
||||
# CA certificates for verifying client certificates
|
||||
ca_certs:
|
||||
- ca1.cert
|
||||
edgebound:
|
||||
listen:
|
||||
addr: 0.0.0.0:30012
|
||||
network: tcp
|
||||
tls:
|
||||
# Whether to enable TLS, default is disabled
|
||||
enable: false
|
||||
# Certificates and private keys, multiple pairs of certificates are allowed for client negotiation
|
||||
certs:
|
||||
- cert: edgebound.cert
|
||||
key: edgebound.key
|
||||
insecure_skip_verify: false
|
||||
# Whether to enable mTLS, client certificates will be verified by the following CA
|
||||
mtls: false
|
||||
# CA certificates for verifying client certificates
|
||||
ca_certs:
|
||||
- ca1.cert
|
||||
```
|
||||
|
||||
### External MQ
|
||||
|
||||
If you need to configure an external MQ, Frontier supports publishing the corresponding topic to these MQs.
|
||||
|
||||
**AMQP**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
amqp:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# AMQP addresses
|
||||
addrs: null
|
||||
# Producer
|
||||
producer:
|
||||
# Exchange name
|
||||
exchange: ""
|
||||
# Equivalent to Frontier's internal topic concept, array values
|
||||
routing_keys: null
|
||||
```
|
||||
|
||||
For AMQP, the above is the minimal configuration. If the topic of the message published by the edge node is in `routing_keys`, Frontier will publish to the `exchange.` If there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**Kafka**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
kafka:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# Kafka addresses
|
||||
addrs: null
|
||||
# Producer
|
||||
# Array values
|
||||
topics: null
|
||||
```
|
||||
|
||||
For Kafka, the above is the minimal configuration. If the topic of the message published by the edge node is in the above array, Frontier will publish it. If there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**NATS**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
nats:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# NATS addresses
|
||||
addrs: null
|
||||
producer:
|
||||
# Equivalent to Frontier's internal topic concept, array values
|
||||
subjects: null
|
||||
# If Jetstream is allowed, it will be prioritized for publishing
|
||||
jetstream:
|
||||
enable: false
|
||||
# Jetstream name
|
||||
name: ""
|
||||
producer:
|
||||
# Equivalent to Frontier's internal topic concept, array values
|
||||
subjects: null
|
||||
```
|
||||
|
||||
In NATS configuration, if Jetstream is allowed, it will be prioritized for publishing. If there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**NSQ**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
nsq:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# NSQ addresses
|
||||
addrs: null
|
||||
producer:
|
||||
# Array values
|
||||
topics: null
|
||||
```
|
||||
In NSQ's topics, if there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**Redis**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
redis:
|
||||
# Whether to allow
|
||||
enable: false
|
||||
# Redis addresses
|
||||
addrs: null
|
||||
# Redis DB
|
||||
db: 0
|
||||
# Password
|
||||
password: ""
|
||||
producer:
|
||||
# Equivalent to Frontier's internal topic concept, array values
|
||||
channels: null
|
||||
```
|
||||
|
||||
If there are also microservices or other external MQs that declare the topic, Frontier will still choose one to publish based on hashby.
|
||||
|
||||
**Other Configurations**
|
||||
|
||||
```yaml
|
||||
daemon:
|
||||
# Whether to enable PProf
|
||||
pprof:
|
||||
addr: 0.0.0.0:6060
|
||||
cpu_profile_rate: 0
|
||||
enable: true
|
||||
# Resource limits
|
||||
rlimit:
|
||||
enable: true
|
||||
nofile: 102400
|
||||
# Control plane enable
|
||||
controlplane:
|
||||
enable: false
|
||||
listen:
|
||||
network: tcp
|
||||
addr: 0.0.0.0:30010
|
||||
dao:
|
||||
# Supports buntdb and sqlite3, both use in-memory mode to remain stateless
|
||||
backend: buntdb
|
||||
# SQLite debug enable
|
||||
debug: false
|
||||
exchange:
|
||||
# Frontier forwards edge node messages, RPCs, and open streams to microservices based on hash strategy: edgeid, srcip, or random, default is edgeid.
|
||||
# That is, the same edge node will always request the same microservice.
|
||||
hashby: edgeid
|
||||
```
|
||||
|
||||
For more detailed configurations, see [frontier_all.yaml](../etc/frontier_all.yaml).
|
||||
|
||||
@@ -0,0 +1,195 @@
|
||||
## 配置
|
||||
|
||||
如果需要更近一步定制你的Frontier实例,可以在这一节了解各个配置是如何工作的。定制完你的配置,保存为```frontier.yaml```,挂载到容器```/usr/conf/frontier.yaml```位置生效。
|
||||
|
||||
### 最小化配置
|
||||
|
||||
简单起,你可以仅配置面向微服务和边缘节点的服务监听地址:
|
||||
|
||||
```yaml
|
||||
# 微服务端配置
|
||||
servicebound:
|
||||
# 监听网络
|
||||
listen:
|
||||
network: tcp
|
||||
# 监听地址
|
||||
addr: 0.0.0.0:30011
|
||||
# 边缘节点端配置
|
||||
edgebound:
|
||||
# 监听网络
|
||||
listen:
|
||||
network: tcp
|
||||
# 监听地址
|
||||
addr: 0.0.0.0:30012
|
||||
# 找不到注册的GetEdgeID时,是否允许Frontier分配edgeID
|
||||
edgeid_alloc_when_no_idservice_on: true
|
||||
```
|
||||
|
||||
### TLS
|
||||
|
||||
对于用户来说,比较重要的TLS配置在微服务、边缘节点和控制面都是支持的,另支持mTLS,Frontier由此校验客户端携带的证书。
|
||||
|
||||
```yaml
|
||||
servicebound:
|
||||
listen:
|
||||
addr: 0.0.0.0:30011
|
||||
network: tcp
|
||||
tls:
|
||||
# 是否开启TLS,默认不开启
|
||||
enable: false
|
||||
# 证书和私钥,允许配置多对证书,由客户端协商确定
|
||||
certs:
|
||||
- cert: servicebound.cert
|
||||
key: servicebound.key
|
||||
# 是否启用mtls,启动会校验客户端携带的证书是否由下面的CA签发
|
||||
mtls: false
|
||||
# CA证书,用于校验客户端证书
|
||||
ca_certs:
|
||||
- ca1.cert
|
||||
edgebound:
|
||||
listen:
|
||||
addr: 0.0.0.0:30012
|
||||
network: tcp
|
||||
tls:
|
||||
# 是否开启TLS,默认不开启
|
||||
enable: false
|
||||
# 证书和私钥,允许配置多对证书,由客户端协商确定
|
||||
certs:
|
||||
- cert: edgebound.cert
|
||||
key: edgebound.key
|
||||
insecure_skip_verify: false
|
||||
# 是否启用mtls,启动会校验客户端携带的证书是否由下面的CA签发
|
||||
mtls: false
|
||||
# CA证书,用于校验客户端证书
|
||||
ca_certs:
|
||||
- ca1.cert
|
||||
```
|
||||
|
||||
### 外部MQ
|
||||
|
||||
如果你需要配置外部MQ,Frontier也支持将相应的Topic转Publish到这些MQ。
|
||||
|
||||
**AMQP**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
amqp:
|
||||
# 是否允许
|
||||
enable: false
|
||||
# AMQP地址
|
||||
addrs: null
|
||||
# 生产者
|
||||
producer:
|
||||
# exchange名
|
||||
exchange: ""
|
||||
# 等于Frontier内Topic的概念,数组值
|
||||
routing_keys: null
|
||||
```
|
||||
对于AMQP来说,以上是最小配置,边缘节点Publish的消息Topic如果在routing_keys内,Frontier会Publish到exchange中,如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。
|
||||
|
||||
**Kafka**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
kafka:
|
||||
# 是否允许
|
||||
enable: false
|
||||
# kafka地址
|
||||
addrs: null
|
||||
# 生产者
|
||||
producer:
|
||||
# 数组值
|
||||
topics: null
|
||||
```
|
||||
对于Kafka来说,以上是最小配置,边缘节点Publish的消息Topic如果在上面数组中,Frontier会Publish过来。如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。
|
||||
|
||||
**NATS**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
nats:
|
||||
# 是否允许
|
||||
enable: false
|
||||
# NATS地址
|
||||
addrs: null
|
||||
producer:
|
||||
# 等于Frontier内Topic的概念,数组值
|
||||
subjects: null
|
||||
# 如果允许jetstream,会优先Publish到jetstream
|
||||
jetstream:
|
||||
enable: false
|
||||
# jetstream名
|
||||
name: ""
|
||||
producer:
|
||||
# 等于Frontier内Topic的概念,数组值
|
||||
subjects: null
|
||||
```
|
||||
NATS配置里,如果允许Jetstream,会优先使用Publish到Jetstream。如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。
|
||||
|
||||
**NSQ**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
nsq:
|
||||
# 是否允许
|
||||
enable: false
|
||||
# NSQ地址
|
||||
addrs: null
|
||||
producer:
|
||||
# 数组值
|
||||
topics: null
|
||||
```
|
||||
NSQ的Topic里,如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。
|
||||
|
||||
**Redis**
|
||||
|
||||
```yaml
|
||||
mqm:
|
||||
redis:
|
||||
# 是否允许
|
||||
enable: false
|
||||
# Redis地址
|
||||
addrs: null
|
||||
# Redis DB
|
||||
db: 0
|
||||
# 密码
|
||||
password: ""
|
||||
producer:
|
||||
# 等于Frontier内Topic的概念,数组值
|
||||
channels: null
|
||||
```
|
||||
如果还有微服务或其他外部MQ也声明了该Topic,Frontier仍然会按照hashby来选择一个Publish。
|
||||
|
||||
|
||||
### 其他配置
|
||||
|
||||
```yaml
|
||||
daemon:
|
||||
# 是否开启PProf
|
||||
pprof:
|
||||
addr: 0.0.0.0:6060
|
||||
cpu_profile_rate: 0
|
||||
enable: true
|
||||
# 资源限制
|
||||
rlimit:
|
||||
enable: true
|
||||
nofile: 102400
|
||||
# 控制面开启
|
||||
controlplane:
|
||||
enable: false
|
||||
listen:
|
||||
network: tcp
|
||||
addr: 0.0.0.0:30010
|
||||
dao:
|
||||
# 支持buntdb和sqlite3,都使用的in-memory模式,保持无状态
|
||||
backend: buntdb
|
||||
# sqlite debug开启
|
||||
debug: false
|
||||
exchange:
|
||||
# Frontier根据edgeid srcip或random的哈希策略转发边缘节点的消息、RPC和打开流到微服务,默认edgeid
|
||||
# 即相同的边缘节点总是会请求到相同的微服务。
|
||||
hashby: edgeid
|
||||
```
|
||||
|
||||
更多详细配置见 [frontier_all.yaml](../etc/frontier_all.yaml)
|
||||
|
||||
+578
@@ -0,0 +1,578 @@
|
||||
## Usage
|
||||
|
||||
### Examples
|
||||
|
||||
In the [examples/chatroom](../examples/chatroom) directory, there is a simple chatroom example implemented in just 100 lines of code. You can get the executable programs chatroom\_service and chatroom\_agent by running:
|
||||
|
||||
```
|
||||
make examples
|
||||
```
|
||||
|
||||
Run the example:
|
||||
|
||||
https://github.com/singchia/frontier/assets/15531166/18b01d96-e30b-450f-9610-917d65259c30
|
||||
|
||||
In this example, you can see features like online/offline notifications and message publishing.
|
||||
|
||||
**Live Streaming**
|
||||
|
||||
In the [examples/rtmp](../examples/rtmp) directory, there is a simple live streaming example implemented in just 80 lines of code. You can get the executable programs `rtmp_service` and `rtmp_edge` by running:
|
||||
|
||||
```
|
||||
make examples
|
||||
```
|
||||
|
||||
After running, use [OBS](https://obsproject.com/) to connect to `rtmp_edge` for live streaming proxy:
|
||||
|
||||
<img src="./diagram/rtmp.png" width="100%">
|
||||
|
||||
In this example, you can see Multiplexer and Stream functionality.
|
||||
|
||||
### Using Frontier in Microservices
|
||||
|
||||
**Getting Service on the Microservice Side**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, err := service.NewService(dialer)
|
||||
// Start using the service
|
||||
}
|
||||
```
|
||||
|
||||
**Receiving ID, Online/Offline Notifications on Microservice Side**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
svc.RegisterGetEdgeID(context.TODO(), getID)
|
||||
svc.RegisterEdgeOnline(context.TODO(), online)
|
||||
svc.RegisterEdgeOffline(context.TODO(), offline)
|
||||
}
|
||||
|
||||
// The service can assign IDs to edges based on metadata
|
||||
func getID(meta []byte) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// Edge goes online
|
||||
func online(edgeID uint64, meta []byte, addr net.Addr) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Edge goes offline
|
||||
func offline(edgeID uint64, meta []byte, addr net.Addr) error {
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Publishing Messages to Edge Nodes**:
|
||||
|
||||
The edge must be online beforehand, otherwise the edge cannot be found.
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
msg := svc.NewMessage([]byte("test"))
|
||||
// Publish a message to the edge node with ID 1001
|
||||
err := svc.Publish(context.TODO(), 1001, msg)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Declaring Topic to Receive**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// Declare the topic to receive when getting the service
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceReceiveTopics([]string{"foo"}))
|
||||
for {
|
||||
// Receive messages
|
||||
msg, err := svc.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of the service has ended and it can no longer be used
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// After processing the message, notify the caller it is done
|
||||
msg.Done()
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Calling Edge Node RPC**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
req := svc.NewRequest([]byte("test"))
|
||||
// Call the "foo" method on the edge node with ID 1001. The edge node must have pre-registered this method.
|
||||
rsp, err := svc.Call(context.TODO(), 1001, "foo", req)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Registering Methods for Edge Nodes to Call**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
// Register an "echo" method
|
||||
svc.Register(context.TODO(), "echo", echo)
|
||||
// ...
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
**Microservice Opening Point-to-Point Stream on Edge Node**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
// Open a new stream to the edge node with ID 1001 (st is also a net.Conn). The edge must accept the stream with AcceptStream.
|
||||
st, err := svc.OpenStream(context.TODO(), 1001)
|
||||
}
|
||||
```
|
||||
Based on this newly opened stream, you can transfer files, proxy traffic, etc.
|
||||
|
||||
**Microservice Receives Stream**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// Declare the service name when getting the service, required when the edge opens a stream to specify the service name.
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
|
||||
for {
|
||||
st, err := svc.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of the service has ended and it can no longer be used
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// Use the stream. This stream is also a net.Conn. You can Read/Write/Close, and also use RPC and messaging.
|
||||
}
|
||||
}
|
||||
```
|
||||
Based on this newly opened stream, you can transfer files, proxy traffic, etc.
|
||||
|
||||
**Messages, RPC, and Streams Together!**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// Declare the service name when getting the service, required when the edge opens a stream to specify the service name.
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
|
||||
|
||||
// Receive streams
|
||||
go func() {
|
||||
for {
|
||||
st, err := svc.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of the service has ended and it can no longer be used
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// Use the stream. This stream is also a net.Conn. You can Read/Write/Close, and also use RPC and messaging.
|
||||
}
|
||||
}()
|
||||
|
||||
// Register an "echo" method
|
||||
svc.Register(context.TODO(), "echo", echo)
|
||||
|
||||
// Receive messages
|
||||
for {
|
||||
msg, err := svc.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of the service has ended and it can no longer be used
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// After processing the message, notify the caller it is done
|
||||
msg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
### Using Frontier on Edge Nodes
|
||||
|
||||
**Getting Edge on the Edge Node Side**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// Start using eg ...
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Publishes Message to Topic**:
|
||||
|
||||
The service needs to declare receiving the topic in advance, or configure an external MQ in the configuration file.
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// Start using eg
|
||||
msg := eg.NewMessage([]byte("test"))
|
||||
err := eg.Publish(context.TODO(), "foo", msg)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Receives Messages**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
for {
|
||||
// Receive messages
|
||||
msg, err := eg.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of eg has ended and it can no longer be used
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// After processing the message, notify the caller it is done
|
||||
msg.Done()
|
||||
}
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Calls RPC on Microservice**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// Start using eg
|
||||
req := eg.NewRequest([]byte("test"))
|
||||
// Call the "echo" method. Frontier will look up and forward the request to the corresponding microservice.
|
||||
rsp, err := eg.Call(context.TODO(), "echo", req)
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Registers RPC**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// Register an "echo" method
|
||||
eg.Register(context.TODO(), "echo", echo)
|
||||
// ...
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
**Edge Node Opens Point-to-Point Stream to Microservice**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
st, err := eg.OpenStream("service-name")
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
Based on this newly opened stream, you can transfer files, proxy traffic, etc.
|
||||
|
||||
**Edge Node Receives Stream**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"fmt"
|
||||
"io"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
for {
|
||||
stream, err := eg.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// Receiving EOF indicates the lifecycle of eg has ended and it can no longer be used
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// Use the stream. This stream is also a net.Conn. You can Read/Write/Close, and also use RPC and messaging.
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Error Handling
|
||||
|
||||
<table><thead>
|
||||
<tr>
|
||||
<th>Error</th>
|
||||
<th>Description and Handling</th>
|
||||
</tr></thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>io.EOF</td>
|
||||
<td>Receiving EOF indicates that the stream or connection has been closed, and you need to exit operations such as Receive and AcceptStream.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>io.ErrShortBuffer</td>
|
||||
<td>The buffer on the sender or receiver is full. You can adjust the buffer size by setting OptionServiceBufferSize or OptionEdgeBufferSize.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.ErrEdgeNotOnline</td>
|
||||
<td>This indicates that the edge node is not online, and you need to check the edge connection.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.ServiceNotOnline</td>
|
||||
<td>This indicates that the microservice is not online, and you need to check the microservice connection information or connection.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.RPCNotOnline</td>
|
||||
<td>This indicates that the RPC called is not online.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.TopicNotOnline</td>
|
||||
<td>This indicates that the topic to be published is not online.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>Other Errors</td>
|
||||
<td>There are also errors like Timeout, BufferFull, etc.</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
It should be noted that if the stream is closed, any blocking methods on the stream will immediately receive io.EOF. If the entry point (Service and Edge) is closed, all streams on it will immediately receive io.EOF for blocking methods.
|
||||
|
||||
### Control Plane
|
||||
|
||||
The Frontier control plane provides gRPC and REST interfaces. Operators can use these APIs to determine the connection status of the current instance. Both gRPC and REST are served on the default port :`30010`.
|
||||
|
||||
**GRPC** See[Protobuf Definition](../api/controlplane/frontier/v1/controlplane.proto)
|
||||
|
||||
```protobuf
|
||||
service ControlPlane {
|
||||
rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse);
|
||||
rpc GetEdge(GetEdgeRequest) returns (Edge);
|
||||
rpc KickEdge(KickEdgeRequest) returns (KickEdgeResponse);
|
||||
rpc ListEdgeRPCs(ListEdgeRPCsRequest) returns (ListEdgeRPCsResponse);
|
||||
rpc ListServices(ListServicesRequest) returns (ListServicesResponse);
|
||||
rpc GetService(GetServiceRequest) returns (Service);
|
||||
rpc KickService(KickServiceRequest) returns (KickServiceResponse);
|
||||
rpc ListServiceRPCs(ListServiceRPCsRequest) returns (ListServiceRPCsResponse);
|
||||
rpc ListServiceTopics(ListServiceTopicsRequest) returns (ListServiceTopicsResponse);
|
||||
}
|
||||
```
|
||||
|
||||
REST Swagger definition can be found at [Swagger Definition](../swagger/swagger.yaml)
|
||||
|
||||
For example, you can use the following request to kick an edge node offline:
|
||||
|
||||
```
|
||||
curl -X DELETE http://127.0.0.1:30010/v1/edges/{edge_id}
|
||||
```
|
||||
|
||||
Or check which RPCs a microservice has registered:
|
||||
|
||||
|
||||
```
|
||||
curl -X GET http://127.0.0.1:30010/v1/services/rpcs?service_id={service_id}
|
||||
```
|
||||
|
||||
Note: gRPC/REST depends on the DAO backend, with two options: ```buntdb``` and ```sqlite3```. Both use in-memory mode. For performance considerations, the default backend uses buntdb, and the count field in the list interface always returns -1. When you configure the backend to ```sqlite3```, it means you have a strong OLTP requirement for connected microservices and edge nodes on Frontier, such as encapsulating the web on Frontier. In this case, the count will return the total number.
|
||||
|
||||
@@ -0,0 +1,586 @@
|
||||
## 使用
|
||||
|
||||
### 示例
|
||||
|
||||
**聊天室**
|
||||
|
||||
目录[examples/chatroom](../examples/chatroom)下有简单的聊天室示例,仅100行代码实现一个的聊天室功能,可以通过
|
||||
|
||||
```
|
||||
make examples
|
||||
```
|
||||
|
||||
在bin目录下得到```chatroom_service```和```chatroom_egent```可执行程序,运行示例:
|
||||
|
||||
https://github.com/singchia/frontier/assets/15531166/18b01d96-e30b-450f-9610-917d65259c30
|
||||
|
||||
在这个示例你可以看到上线离线通知,消息Publish等功能。
|
||||
|
||||
**直播**
|
||||
|
||||
目录[examples/rtmp](../examples/rtmp)下有简单的直播示例,仅80行代码实现一个的直播代理功能,可以通过
|
||||
|
||||
```
|
||||
make examples
|
||||
```
|
||||
|
||||
在bin目录下得到```rtmp_service```和```rtmp_edge```可执行程序,运行后,使用[OBS](https://obsproject.com/)连接rtmp_edge即可直播代理:
|
||||
|
||||
<img src="./diagram/rtmp.png" width="100%">
|
||||
|
||||
在这个示例你可以看到Multiplexer和Stream功能。
|
||||
|
||||
### 微服务如何使用
|
||||
|
||||
**微服务侧获取Service**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, err := service.NewService(dialer)
|
||||
// 开始使用service
|
||||
}
|
||||
```
|
||||
|
||||
**微服务接收获取ID、上线/离线通知**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
svc.RegisterGetEdgeID(context.TODO(), getID)
|
||||
svc.RegisterEdgeOnline(context.TODO(), online)
|
||||
svc.RegisterEdgeOffline(context.TODO(), offline)
|
||||
}
|
||||
|
||||
// service可以根据meta分配id给edge
|
||||
func getID(meta []byte) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// edge上线
|
||||
func online(edgeID uint64, meta []byte, addr net.Addr) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// edge离线
|
||||
func offline(edgeID uint64, meta []byte, addr net.Addr) error {
|
||||
return nil
|
||||
}
|
||||
```
|
||||
|
||||
**微服务发布消息到边缘节点**:
|
||||
|
||||
前提需要该Edge在线,否则会找不到Edge
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
msg := svc.NewMessage([]byte("test"))
|
||||
// 发布一条消息到ID为1001的边缘节点
|
||||
err := svc.Publish(context.TODO(), 1001, msg)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**微服务声明接收Topic**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// 在获取svc时声明需要接收的topic
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceReceiveTopics([]string{"foo"}))
|
||||
for {
|
||||
// 接收消息
|
||||
msg, err := svc.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// 收到EOF表示svc生命周期已终结,不可以再使用
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// 处理完msg后,需要通知调用方已完成
|
||||
msg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
**微服务调用边缘节点的RPC**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
req := svc.NewRequest([]byte("test"))
|
||||
// 调用ID为1001边缘节点的foo方法,前提是边缘节点需要预注册该方法
|
||||
rsp, err := svc.Call(context.TODO(), 1001, "foo", req)
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
**微服务注册方法以供边缘节点调用**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
// 注册一个"echo"方法
|
||||
svc.Register(context.TODO(), "echo", echo)
|
||||
// ...
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
**微服务打开边缘节点的点到点流**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
svc, _ := service.NewService(dialer)
|
||||
// 打开ID为1001边缘节点的新流(同时st也是一个net.Conn),前提是edge需要AcceptStream接收该流
|
||||
st, err := svc.OpenStream(context.TODO(), 1001)
|
||||
}
|
||||
```
|
||||
基于这个新打开流,你可以用来传递文件、代理流量等。
|
||||
|
||||
|
||||
**微服务接收流**
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// 在获取svc时声明需要微服务名,在边缘打开流时需要指定该微服务名
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
|
||||
for {
|
||||
st, err := svc.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// 收到EOF表示svc生命周期已终结,不可以再使用
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// 使用stream,这个stream同时是个net.Conn,你可以Read/Write/Close,同时也可以RPC和消息
|
||||
}
|
||||
}
|
||||
```
|
||||
基于这个新打开流,你可以用来传递文件、代理流量等。
|
||||
|
||||
**消息、RPC和流一起来吧!**
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/service"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30011")
|
||||
}
|
||||
// 在获取svc时声明需要微服务名,在边缘打开流时需要指定该微服务名
|
||||
svc, _ := service.NewService(dialer, service.OptionServiceName("service-name"))
|
||||
|
||||
// 接收流
|
||||
go func() {
|
||||
for {
|
||||
st, err := svc.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// 收到EOF表示svc生命周期已终结,不可以再使用
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// 使用stream,这个stream同时是个net.Conn,你可以Read/Write/Close,同时也可以RPC和消息
|
||||
}
|
||||
}()
|
||||
|
||||
// 注册一个"echo"方法
|
||||
svc.Register(context.TODO(), "echo", echo)
|
||||
|
||||
// 接收消息
|
||||
for {
|
||||
msg, err := svc.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// 收到EOF表示svc生命周期已终结,不可以再使用
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// 处理完msg后,需要通知调用方已完成
|
||||
msg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
### 边缘节点/客户端如何使用
|
||||
|
||||
**边缘节点侧获取Edge**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// 开始使用eg ...
|
||||
}
|
||||
```
|
||||
|
||||
**边缘节点发布消息到Topic**:
|
||||
|
||||
Service需要提前声明接收该Topic,或者在配置文件中配置外部MQ。
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// 开始使用eg
|
||||
msg := eg.NewMessage([]byte("test"))
|
||||
err := eg.Publish(context.TODO(), "foo", msg)
|
||||
// ...
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
**边缘节点接收消息**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
for {
|
||||
// 接收消息
|
||||
msg, err := eg.Receive(context.TODO())
|
||||
if err == io.EOF {
|
||||
// 收到EOF表示eg生命周期已终结,不可以再使用
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Println("receive err:", err)
|
||||
continue
|
||||
}
|
||||
// 处理完msg后,需要通知调用方已完成
|
||||
msg.Done()
|
||||
}
|
||||
// ...
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
**边缘节点调用微服务RPC**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// 开始使用eg
|
||||
req := eg.NewRequest([]byte("test"))
|
||||
// 调用echo方法,Frontier会查找并转发请求到相应的微服务
|
||||
rsp, err := eg.Call(context.TODO(), "echo", req)
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
**边缘节点注册RPC**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
"github.com/singchia/geminio"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
// 注册一个"echo"方法
|
||||
eg.Register(context.TODO(), "echo", echo)
|
||||
// ...
|
||||
}
|
||||
|
||||
func echo(ctx context.Context, req geminio.Request, rsp geminio.Response) {
|
||||
value := req.Data()
|
||||
rsp.SetData(value)
|
||||
}
|
||||
```
|
||||
|
||||
**边缘节点打开微服务的点到点流**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
st, err := eg.OpenStream("service-name")
|
||||
// ...
|
||||
}
|
||||
```
|
||||
基于这个新打开流,你可以用来传递文件、代理流量等。
|
||||
|
||||
**边缘节点接收流**:
|
||||
|
||||
```golang
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"fmt"
|
||||
"io"
|
||||
"github.com/singchia/frontier/api/dataplane/v1/edge"
|
||||
)
|
||||
|
||||
func main() {
|
||||
dialer := func() (net.Conn, error) {
|
||||
return net.Dial("tcp", "127.0.0.1:30012")
|
||||
}
|
||||
eg, _ := edge.NewEdge(dialer)
|
||||
for {
|
||||
stream, err := eg.AcceptStream()
|
||||
if err == io.EOF {
|
||||
// 收到EOF表示eg生命周期已终结,不可以再使用
|
||||
return
|
||||
} else if err != nil {
|
||||
fmt.Println("accept stream err:", err)
|
||||
continue
|
||||
}
|
||||
// 使用stream,这个stream同时是个net.Conn,你可以Read/Write/Close,同时也可以RPC和消息
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 错误处理
|
||||
|
||||
<table><thead>
|
||||
<tr>
|
||||
<th>错误</th>
|
||||
<th>描述和处理</th>
|
||||
</tr></thead>
|
||||
<tbody>
|
||||
<tr>
|
||||
<td>io.EOF</td>
|
||||
<td>收到EOF表示流或连接已关闭,需要退出Receive、AcceptStream等操作</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>io.ErrShortBuffer</td>
|
||||
<td>发送端或者接收端的Buffer已满,可以设置OptionServiceBufferSize或OptionEdgeBufferSize来调整</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.ErrEdgeNotOnline</td>
|
||||
<td>表示该边缘节点不在线,需要检查边缘连接</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.ServiceNotOnline</td>
|
||||
<td>表示微服务不在线,需要检查微服务连接信息或连接</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.RPCNotOnline</td>
|
||||
<td>表示Call的RPC不在线</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>apis.TopicOnline</td>
|
||||
<td>表示Publish的Topic不在线</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>其他错误</td>
|
||||
<td>还存在Timeout、BufferFull等</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
需要注意的是,如果关闭流,在流上正在阻塞的方法都会立即得到io.EOF,如果关闭入口(Service和Edge),则所有在此之上的流,阻塞的方法都会立即得到io.EOF。
|
||||
|
||||
### 控制面
|
||||
|
||||
Frontier控制面提供gRPC和Rest接口,运维人员可以使用这些api来确定本实例的连接情况,gRPC和Rest都由默认端口```:30010```提供服务。
|
||||
|
||||
**GRPC** 详见[Protobuf定义](../api/controlplane/frontier/v1/controlplane.proto)
|
||||
|
||||
|
||||
```protobuf
|
||||
service ControlPlane {
|
||||
rpc ListEdges(ListEdgesRequest) returns (ListEdgesResponse);
|
||||
rpc GetEdge(GetEdgeRequest) returns (Edge);
|
||||
rpc KickEdge(KickEdgeRequest) returns (KickEdgeResponse);
|
||||
rpc ListEdgeRPCs(ListEdgeRPCsRequest) returns (ListEdgeRPCsResponse);
|
||||
rpc ListServices(ListServicesRequest) returns (ListServicesResponse);
|
||||
rpc GetService(GetServiceRequest) returns (Service);
|
||||
rpc KickService(KickServiceRequest) returns (KickServiceResponse);
|
||||
rpc ListServiceRPCs(ListServiceRPCsRequest) returns (ListServiceRPCsResponse);
|
||||
rpc ListServiceTopics(ListServiceTopicsRequest) returns (ListServiceTopicsResponse);
|
||||
}
|
||||
```
|
||||
|
||||
**REST** Swagger详见[Swagger定义](../swagger/swagger.yaml)
|
||||
|
||||
例如你可以使用下面请求来踢除某个边缘节点下线:
|
||||
|
||||
```
|
||||
curl -X DELETE http://127.0.0.1:30010/v1/edges/{edge_id}
|
||||
```
|
||||
或查看某个微服务注册了哪些RPC:
|
||||
|
||||
```
|
||||
curl -X GET http://127.0.0.1:30010/v1/services/rpcs?service_id={service_id}
|
||||
```
|
||||
|
||||
**注意**:gRPC/Rest依赖dao backend,有两个选项```buntdb```和```sqlite```,都是使用的in-memory模式,为性能考虑,默认backend使用buntdb,并且列表接口返回字段count永远是-1,当你配置backend为sqlite3时,会认为你对在Frontier上连接的微服务和边缘节点有强烈的OLTP需求,例如在Frontier上封装web,此时count才会返回总数。
|
||||
|
||||
|
||||
Reference in New Issue
Block a user