add CMuxPlugin

This commit is contained in:
smallnest
2021-04-09 18:33:54 +08:00
parent 39f270298e
commit 7cece5b70e
4 changed files with 32 additions and 12 deletions
+1
View File
@@ -11,6 +11,7 @@
- support DNS as service discovery
- support rpcx flow tracing
- support websocket as the transport like tcp,kcp and quic
- add CMuxPlugin to allow developing customzied services by using the same single port
## 6.0
+2 -4
View File
@@ -1,7 +1,5 @@
<p style="font-size: 64px;color: red;">rpcx重新建立了tag,将tag的版本号控制在 <b>1.x.x</b> 之内,看[issue#567](https://github.com/smallnest/rpcx/issues/567)对你的影响</p>
**stable branch**: v1.6.2
**development branch**: master
- **stable branch**: v1.6.2
- **development branch**: master
<a href="https://rpcx.io/"><img height="160" src="http://rpcx.io/logos/rpcx-logo-text.png"></a>
+7 -2
View File
@@ -28,6 +28,11 @@ func (s *Server) startGateway(network string, ln net.Listener) net.Listener {
rpcxLn := m.Match(rpcxPrefixByteMatcher())
// mux Plugins
if s.Plugins != nil {
s.Plugins.MuxMatch(m)
}
if !s.DisableJSONRPC {
jsonrpc2Ln := m.Match(cmux.HTTP1HeaderField("X-JSONRPC-2.0", "true"))
go s.startJSONRPC2(jsonrpc2Ln)
@@ -108,7 +113,7 @@ func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, pa
req, err := HTTPRequest2RpcxRequest(r)
defer protocol.FreeMsg(req)
//set headers
// set headers
wh.Set(XVersion, r.Header.Get(XVersion))
wh.Set(XMessageID, r.Header.Get(XMessageID))
@@ -175,7 +180,7 @@ func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, pa
}
s.Plugins.DoPreWriteResponse(newCtx, req, nil, nil)
if len(resMetadata) > 0 { //copy meta in context to request
if len(resMetadata) > 0 { // copy meta in context to request
meta := res.Metadata
if meta == nil {
res.Metadata = resMetadata
+22 -6
View File
@@ -6,10 +6,11 @@ import (
"github.com/smallnest/rpcx/errors"
"github.com/smallnest/rpcx/protocol"
"github.com/soheilhy/cmux"
)
//PluginContainer represents a plugin container that defines all methods to manage plugins.
//And it also defines all extension points.
// PluginContainer represents a plugin container that defines all methods to manage plugins.
// And it also defines all extension points.
type PluginContainer interface {
Add(plugin Plugin)
Remove(plugin Plugin)
@@ -36,10 +37,12 @@ type PluginContainer interface {
DoPostWriteRequest(ctx context.Context, r *protocol.Message, e error) error
DoHeartbeatRequest(ctx context.Context, req *protocol.Message) error
MuxMatch(m cmux.CMux)
}
// Plugin is the server plugin interface.
type Plugin interface {}
type Plugin interface{}
type (
// RegisterPlugin is .
@@ -112,6 +115,10 @@ type (
HeartbeatPlugin interface {
HeartbeatRequest(ctx context.Context, req *protocol.Message) error
}
CMuxPlugin interface {
MuxMatch(m cmux.CMux)
}
)
// pluginContainer implements PluginContainer interface.
@@ -198,13 +205,13 @@ func (p *pluginContainer) DoUnregister(name string) error {
return nil
}
//DoPostConnAccept handles accepted conn
// DoPostConnAccept handles accepted conn
func (p *pluginContainer) DoPostConnAccept(conn net.Conn) (net.Conn, bool) {
var flag bool
for i := range p.plugins {
if plugin, ok := p.plugins[i].(PostConnAcceptPlugin); ok {
conn, flag = plugin.HandleConnAccept(conn)
if !flag { //interrupt
if !flag { // interrupt
conn.Close()
return conn, false
}
@@ -213,7 +220,7 @@ func (p *pluginContainer) DoPostConnAccept(conn net.Conn) (net.Conn, bool) {
return conn, true
}
//DoPostConnClose handles closed conn
// DoPostConnClose handles closed conn
func (p *pluginContainer) DoPostConnClose(conn net.Conn) bool {
var flag bool
for i := range p.plugins {
@@ -368,3 +375,12 @@ func (p *pluginContainer) DoHeartbeatRequest(ctx context.Context, r *protocol.Me
return nil
}
// MuxMatch adds cmux Match.
func (p *pluginContainer) MuxMatch(m cmux.CMux) {
for i := range p.plugins {
if plugin, ok := p.plugins[i].(CMuxPlugin); ok {
plugin.MuxMatch(m)
}
}
}