mirror of
https://github.com/fatedier/frp.git
synced 2025-05-10 00:48:58 +00:00
Compare commits
No commits in common. "15a245766e293098c23b96ae76cdf8117b245714" and "9ba6a064709cbf8a0036df905c3f0e1262bb0355" have entirely different histories.
15a245766e
...
9ba6a06470
@ -121,7 +121,7 @@ func (c *Controller) CleanWorker(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) ListenClient(name string, sk string, allowUsers []string) (chan string, error) {
|
||||
func (c *Controller) ListenClient(name string, sk string, allowUsers []string) chan string {
|
||||
cfg := &ClientCfg{
|
||||
name: name,
|
||||
sk: sk,
|
||||
@ -130,11 +130,8 @@ func (c *Controller) ListenClient(name string, sk string, allowUsers []string) (
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if _, ok := c.clientCfgs[name]; ok {
|
||||
return nil, fmt.Errorf("proxy [%s] is repeated", name)
|
||||
}
|
||||
c.clientCfgs[name] = cfg
|
||||
return cfg.sidCh, nil
|
||||
return cfg.sidCh
|
||||
}
|
||||
|
||||
func (c *Controller) CloseClient(name string) {
|
||||
|
@ -577,11 +577,6 @@ func (ctl *Control) RegisterProxy(pxyMsg *msg.NewProxy) (remoteAddr string, err
|
||||
}()
|
||||
}
|
||||
|
||||
if ctl.pxyManager.Exist(pxyMsg.ProxyName) {
|
||||
err = fmt.Errorf("proxy [%s] already exists", pxyMsg.ProxyName)
|
||||
return
|
||||
}
|
||||
|
||||
remoteAddr, err = pxy.Run()
|
||||
if err != nil {
|
||||
return
|
||||
|
@ -17,10 +17,10 @@ package proxy
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
libio "github.com/fatedier/golib/io"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/frp/pkg/config"
|
||||
"github.com/fatedier/frp/pkg/util/limit"
|
||||
@ -30,10 +30,6 @@ import (
|
||||
"github.com/fatedier/frp/server/metrics"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterProxyFactory(reflect.TypeOf(&config.HTTPProxyConf{}), NewHTTPProxy)
|
||||
}
|
||||
|
||||
type HTTPProxy struct {
|
||||
*BaseProxy
|
||||
cfg *config.HTTPProxyConf
|
||||
@ -41,17 +37,6 @@ type HTTPProxy struct {
|
||||
closeFuncs []func()
|
||||
}
|
||||
|
||||
func NewHTTPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
|
||||
unwrapped, ok := cfg.(*config.HTTPProxyConf)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return &HTTPProxy{
|
||||
BaseProxy: baseProxy,
|
||||
cfg: unwrapped,
|
||||
}
|
||||
}
|
||||
|
||||
func (pxy *HTTPProxy) Run() (remoteAddr string, err error) {
|
||||
xl := pxy.xl
|
||||
routeConfig := vhost.RouteConfig{
|
||||
@ -152,6 +137,10 @@ func (pxy *HTTPProxy) GetConf() config.ProxyConf {
|
||||
return pxy.cfg
|
||||
}
|
||||
|
||||
func (pxy *HTTPProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *HTTPProxy) GetRealConn(remoteAddr string) (workConn net.Conn, err error) {
|
||||
xl := pxy.xl
|
||||
rAddr, errRet := net.ResolveTCPAddr("tcp", remoteAddr)
|
||||
|
@ -15,34 +15,20 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/frp/pkg/config"
|
||||
"github.com/fatedier/frp/pkg/util/util"
|
||||
"github.com/fatedier/frp/pkg/util/vhost"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterProxyFactory(reflect.TypeOf(&config.HTTPSProxyConf{}), NewHTTPSProxy)
|
||||
}
|
||||
|
||||
type HTTPSProxy struct {
|
||||
*BaseProxy
|
||||
cfg *config.HTTPSProxyConf
|
||||
}
|
||||
|
||||
func NewHTTPSProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
|
||||
unwrapped, ok := cfg.(*config.HTTPSProxyConf)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return &HTTPSProxy{
|
||||
BaseProxy: baseProxy,
|
||||
cfg: unwrapped,
|
||||
}
|
||||
}
|
||||
|
||||
func (pxy *HTTPSProxy) Run() (remoteAddr string, err error) {
|
||||
xl := pxy.xl
|
||||
routeConfig := &vhost.RouteConfig{}
|
||||
@ -81,7 +67,7 @@ func (pxy *HTTPSProxy) Run() (remoteAddr string, err error) {
|
||||
addrs = append(addrs, util.CanonicalAddr(routeConfig.Domain, pxy.serverCfg.VhostHTTPSPort))
|
||||
}
|
||||
|
||||
pxy.startCommonTCPListenersHandler()
|
||||
pxy.startListenHandler(pxy, HandleUserTCPConnection)
|
||||
remoteAddr = strings.Join(addrs, ",")
|
||||
return
|
||||
}
|
||||
@ -90,6 +76,10 @@ func (pxy *HTTPSProxy) GetConf() config.ProxyConf {
|
||||
return pxy.cfg
|
||||
}
|
||||
|
||||
func (pxy *HTTPSProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *HTTPSProxy) Close() {
|
||||
pxy.BaseProxy.Close()
|
||||
}
|
||||
|
@ -19,7 +19,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@ -37,12 +36,6 @@ import (
|
||||
"github.com/fatedier/frp/server/metrics"
|
||||
)
|
||||
|
||||
var proxyFactoryRegistry = map[reflect.Type]func(*BaseProxy, config.ProxyConf) Proxy{}
|
||||
|
||||
func RegisterProxyFactory(proxyConfType reflect.Type, factory func(*BaseProxy, config.ProxyConf) Proxy) {
|
||||
proxyFactoryRegistry[proxyConfType] = factory
|
||||
}
|
||||
|
||||
type GetWorkConnFn func() (net.Conn, error)
|
||||
|
||||
type Proxy interface {
|
||||
@ -70,7 +63,6 @@ type BaseProxy struct {
|
||||
limiter *rate.Limiter
|
||||
userInfo plugin.UserInfo
|
||||
loginMsg *msg.Login
|
||||
pxyConf config.ProxyConf
|
||||
|
||||
mu sync.RWMutex
|
||||
xl *xlog.Logger
|
||||
@ -101,10 +93,6 @@ func (pxy *BaseProxy) GetLoginMsg() *msg.Login {
|
||||
return pxy.loginMsg
|
||||
}
|
||||
|
||||
func (pxy *BaseProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *BaseProxy) Close() {
|
||||
xl := xlog.FromContextSafe(pxy.ctx)
|
||||
xl.Info("proxy closing")
|
||||
@ -167,8 +155,10 @@ func (pxy *BaseProxy) GetWorkConnFromPool(src, dst net.Addr) (workConn net.Conn,
|
||||
return
|
||||
}
|
||||
|
||||
// startCommonTCPListenersHandler start a goroutine handler for each listener.
|
||||
func (pxy *BaseProxy) startCommonTCPListenersHandler() {
|
||||
// startListenHandler start a goroutine handler for each listener.
|
||||
// p: p will just be passed to handler(Proxy, utilnet.Conn).
|
||||
// handler: each proxy type can set different handler function to deal with connections accepted from listeners.
|
||||
func (pxy *BaseProxy) startListenHandler(p Proxy, handler func(Proxy, net.Conn, config.ServerCommonConf)) {
|
||||
xl := xlog.FromContextSafe(pxy.ctx)
|
||||
for _, listener := range pxy.listeners {
|
||||
go func(l net.Listener) {
|
||||
@ -197,72 +187,12 @@ func (pxy *BaseProxy) startCommonTCPListenersHandler() {
|
||||
return
|
||||
}
|
||||
xl.Info("get a user connection [%s]", c.RemoteAddr().String())
|
||||
go pxy.handleUserTCPConnection(c)
|
||||
go handler(p, c, pxy.serverCfg)
|
||||
}
|
||||
}(listener)
|
||||
}
|
||||
}
|
||||
|
||||
// HandleUserTCPConnection is used for incoming user TCP connections.
|
||||
func (pxy *BaseProxy) handleUserTCPConnection(userConn net.Conn) {
|
||||
xl := xlog.FromContextSafe(pxy.Context())
|
||||
defer userConn.Close()
|
||||
|
||||
serverCfg := pxy.serverCfg
|
||||
cfg := pxy.pxyConf.GetBaseConfig()
|
||||
// server plugin hook
|
||||
rc := pxy.GetResourceController()
|
||||
content := &plugin.NewUserConnContent{
|
||||
User: pxy.GetUserInfo(),
|
||||
ProxyName: pxy.GetName(),
|
||||
ProxyType: cfg.ProxyType,
|
||||
RemoteAddr: userConn.RemoteAddr().String(),
|
||||
}
|
||||
_, err := rc.PluginManager.NewUserConn(content)
|
||||
if err != nil {
|
||||
xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
|
||||
return
|
||||
}
|
||||
|
||||
// try all connections from the pool
|
||||
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer workConn.Close()
|
||||
|
||||
var local io.ReadWriteCloser = workConn
|
||||
xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
|
||||
if cfg.UseEncryption {
|
||||
local, err = libio.WithEncryption(local, []byte(serverCfg.Token))
|
||||
if err != nil {
|
||||
xl.Error("create encryption stream error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if cfg.UseCompression {
|
||||
local = libio.WithCompression(local)
|
||||
}
|
||||
|
||||
if pxy.GetLimiter() != nil {
|
||||
local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
|
||||
return local.Close()
|
||||
})
|
||||
}
|
||||
|
||||
xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
|
||||
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
|
||||
|
||||
name := pxy.GetName()
|
||||
proxyType := cfg.ProxyType
|
||||
metrics.Server.OpenConnection(name, proxyType)
|
||||
inCount, outCount, _ := libio.Join(local, userConn)
|
||||
metrics.Server.CloseConnection(name, proxyType)
|
||||
metrics.Server.AddTrafficIn(name, proxyType, inCount)
|
||||
metrics.Server.AddTrafficOut(name, proxyType, outCount)
|
||||
xl.Debug("join connections closed")
|
||||
}
|
||||
|
||||
func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.ResourceController, poolCount int,
|
||||
getWorkConnFn GetWorkConnFn, pxyConf config.ProxyConf, serverCfg config.ServerCommonConf, loginMsg *msg.Login,
|
||||
) (pxy Proxy, err error) {
|
||||
@ -286,18 +216,114 @@ func NewProxy(ctx context.Context, userInfo plugin.UserInfo, rc *controller.Reso
|
||||
ctx: xlog.NewContext(ctx, xl),
|
||||
userInfo: userInfo,
|
||||
loginMsg: loginMsg,
|
||||
pxyConf: pxyConf,
|
||||
}
|
||||
|
||||
factory := proxyFactoryRegistry[reflect.TypeOf(pxyConf)]
|
||||
if factory == nil {
|
||||
switch cfg := pxyConf.(type) {
|
||||
case *config.TCPProxyConf:
|
||||
basePxy.usedPortsNum = 1
|
||||
pxy = &TCPProxy{
|
||||
BaseProxy: &basePxy,
|
||||
cfg: cfg,
|
||||
}
|
||||
case *config.TCPMuxProxyConf:
|
||||
pxy = &TCPMuxProxy{
|
||||
BaseProxy: &basePxy,
|
||||
cfg: cfg,
|
||||
}
|
||||
case *config.HTTPProxyConf:
|
||||
pxy = &HTTPProxy{
|
||||
BaseProxy: &basePxy,
|
||||
cfg: cfg,
|
||||
}
|
||||
case *config.HTTPSProxyConf:
|
||||
pxy = &HTTPSProxy{
|
||||
BaseProxy: &basePxy,
|
||||
cfg: cfg,
|
||||
}
|
||||
case *config.UDPProxyConf:
|
||||
basePxy.usedPortsNum = 1
|
||||
pxy = &UDPProxy{
|
||||
BaseProxy: &basePxy,
|
||||
cfg: cfg,
|
||||
}
|
||||
case *config.STCPProxyConf:
|
||||
pxy = &STCPProxy{
|
||||
BaseProxy: &basePxy,
|
||||
cfg: cfg,
|
||||
}
|
||||
case *config.XTCPProxyConf:
|
||||
pxy = &XTCPProxy{
|
||||
BaseProxy: &basePxy,
|
||||
cfg: cfg,
|
||||
}
|
||||
case *config.SUDPProxyConf:
|
||||
pxy = &SUDPProxy{
|
||||
BaseProxy: &basePxy,
|
||||
cfg: cfg,
|
||||
}
|
||||
default:
|
||||
return pxy, fmt.Errorf("proxy type not support")
|
||||
}
|
||||
pxy = factory(&basePxy, pxyConf)
|
||||
if pxy == nil {
|
||||
return nil, fmt.Errorf("proxy not created")
|
||||
return
|
||||
}
|
||||
|
||||
// HandleUserTCPConnection is used for incoming user TCP connections.
|
||||
// It can be used for tcp, http, https type.
|
||||
func HandleUserTCPConnection(pxy Proxy, userConn net.Conn, serverCfg config.ServerCommonConf) {
|
||||
xl := xlog.FromContextSafe(pxy.Context())
|
||||
defer userConn.Close()
|
||||
|
||||
// server plugin hook
|
||||
rc := pxy.GetResourceController()
|
||||
content := &plugin.NewUserConnContent{
|
||||
User: pxy.GetUserInfo(),
|
||||
ProxyName: pxy.GetName(),
|
||||
ProxyType: pxy.GetConf().GetBaseConfig().ProxyType,
|
||||
RemoteAddr: userConn.RemoteAddr().String(),
|
||||
}
|
||||
return pxy, nil
|
||||
_, err := rc.PluginManager.NewUserConn(content)
|
||||
if err != nil {
|
||||
xl.Warn("the user conn [%s] was rejected, err:%v", content.RemoteAddr, err)
|
||||
return
|
||||
}
|
||||
|
||||
// try all connections from the pool
|
||||
workConn, err := pxy.GetWorkConnFromPool(userConn.RemoteAddr(), userConn.LocalAddr())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer workConn.Close()
|
||||
|
||||
var local io.ReadWriteCloser = workConn
|
||||
cfg := pxy.GetConf().GetBaseConfig()
|
||||
xl.Trace("handler user tcp connection, use_encryption: %t, use_compression: %t", cfg.UseEncryption, cfg.UseCompression)
|
||||
if cfg.UseEncryption {
|
||||
local, err = libio.WithEncryption(local, []byte(serverCfg.Token))
|
||||
if err != nil {
|
||||
xl.Error("create encryption stream error: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if cfg.UseCompression {
|
||||
local = libio.WithCompression(local)
|
||||
}
|
||||
|
||||
if pxy.GetLimiter() != nil {
|
||||
local = libio.WrapReadWriteCloser(limit.NewReader(local, pxy.GetLimiter()), limit.NewWriter(local, pxy.GetLimiter()), func() error {
|
||||
return local.Close()
|
||||
})
|
||||
}
|
||||
|
||||
xl.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(),
|
||||
workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String())
|
||||
|
||||
name := pxy.GetName()
|
||||
proxyType := pxy.GetConf().GetBaseConfig().ProxyType
|
||||
metrics.Server.OpenConnection(name, proxyType)
|
||||
inCount, outCount, _ := libio.Join(local, userConn)
|
||||
metrics.Server.CloseConnection(name, proxyType)
|
||||
metrics.Server.AddTrafficIn(name, proxyType, inCount)
|
||||
metrics.Server.AddTrafficOut(name, proxyType, outCount)
|
||||
xl.Debug("join connections closed")
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
@ -324,13 +350,6 @@ func (pm *Manager) Add(name string, pxy Proxy) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *Manager) Exist(name string) bool {
|
||||
pm.mu.RLock()
|
||||
defer pm.mu.RUnlock()
|
||||
_, ok := pm.pxys[name]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (pm *Manager) Del(name string) {
|
||||
pm.mu.Lock()
|
||||
defer pm.mu.Unlock()
|
||||
|
@ -15,31 +15,16 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/frp/pkg/config"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterProxyFactory(reflect.TypeOf(&config.STCPProxyConf{}), NewSTCPProxy)
|
||||
}
|
||||
|
||||
type STCPProxy struct {
|
||||
*BaseProxy
|
||||
cfg *config.STCPProxyConf
|
||||
}
|
||||
|
||||
func NewSTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
|
||||
unwrapped, ok := cfg.(*config.STCPProxyConf)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return &STCPProxy{
|
||||
BaseProxy: baseProxy,
|
||||
cfg: unwrapped,
|
||||
}
|
||||
}
|
||||
|
||||
func (pxy *STCPProxy) Run() (remoteAddr string, err error) {
|
||||
xl := pxy.xl
|
||||
allowUsers := pxy.cfg.AllowUsers
|
||||
@ -55,7 +40,7 @@ func (pxy *STCPProxy) Run() (remoteAddr string, err error) {
|
||||
pxy.listeners = append(pxy.listeners, listener)
|
||||
xl.Info("stcp proxy custom listen success")
|
||||
|
||||
pxy.startCommonTCPListenersHandler()
|
||||
pxy.startListenHandler(pxy, HandleUserTCPConnection)
|
||||
return
|
||||
}
|
||||
|
||||
@ -63,6 +48,10 @@ func (pxy *STCPProxy) GetConf() config.ProxyConf {
|
||||
return pxy.cfg
|
||||
}
|
||||
|
||||
func (pxy *STCPProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *STCPProxy) Close() {
|
||||
pxy.BaseProxy.Close()
|
||||
pxy.rc.VisitorManager.CloseListener(pxy.GetName())
|
||||
|
@ -15,31 +15,16 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/frp/pkg/config"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterProxyFactory(reflect.TypeOf(&config.SUDPProxyConf{}), NewSUDPProxy)
|
||||
}
|
||||
|
||||
type SUDPProxy struct {
|
||||
*BaseProxy
|
||||
cfg *config.SUDPProxyConf
|
||||
}
|
||||
|
||||
func NewSUDPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
|
||||
unwrapped, ok := cfg.(*config.SUDPProxyConf)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return &SUDPProxy{
|
||||
BaseProxy: baseProxy,
|
||||
cfg: unwrapped,
|
||||
}
|
||||
}
|
||||
|
||||
func (pxy *SUDPProxy) Run() (remoteAddr string, err error) {
|
||||
xl := pxy.xl
|
||||
allowUsers := pxy.cfg.AllowUsers
|
||||
@ -55,7 +40,7 @@ func (pxy *SUDPProxy) Run() (remoteAddr string, err error) {
|
||||
pxy.listeners = append(pxy.listeners, listener)
|
||||
xl.Info("sudp proxy custom listen success")
|
||||
|
||||
pxy.startCommonTCPListenersHandler()
|
||||
pxy.startListenHandler(pxy, HandleUserTCPConnection)
|
||||
return
|
||||
}
|
||||
|
||||
@ -63,6 +48,10 @@ func (pxy *SUDPProxy) GetConf() config.ProxyConf {
|
||||
return pxy.cfg
|
||||
}
|
||||
|
||||
func (pxy *SUDPProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *SUDPProxy) Close() {
|
||||
pxy.BaseProxy.Close()
|
||||
pxy.rc.VisitorManager.CloseListener(pxy.GetName())
|
||||
|
@ -17,39 +17,24 @@ package proxy
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/frp/pkg/config"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterProxyFactory(reflect.TypeOf(&config.TCPProxyConf{}), NewTCPProxy)
|
||||
}
|
||||
|
||||
type TCPProxy struct {
|
||||
*BaseProxy
|
||||
cfg *config.TCPProxyConf
|
||||
|
||||
realBindPort int
|
||||
}
|
||||
|
||||
func NewTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
|
||||
unwrapped, ok := cfg.(*config.TCPProxyConf)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
baseProxy.usedPortsNum = 1
|
||||
return &TCPProxy{
|
||||
BaseProxy: baseProxy,
|
||||
cfg: unwrapped,
|
||||
}
|
||||
realPort int
|
||||
}
|
||||
|
||||
func (pxy *TCPProxy) Run() (remoteAddr string, err error) {
|
||||
xl := pxy.xl
|
||||
if pxy.cfg.Group != "" {
|
||||
l, realBindPort, errRet := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, pxy.serverCfg.ProxyBindAddr, pxy.cfg.RemotePort)
|
||||
l, realPort, errRet := pxy.rc.TCPGroupCtl.Listen(pxy.name, pxy.cfg.Group, pxy.cfg.GroupKey, pxy.serverCfg.ProxyBindAddr, pxy.cfg.RemotePort)
|
||||
if errRet != nil {
|
||||
err = errRet
|
||||
return
|
||||
@ -59,20 +44,20 @@ func (pxy *TCPProxy) Run() (remoteAddr string, err error) {
|
||||
l.Close()
|
||||
}
|
||||
}()
|
||||
pxy.realBindPort = realBindPort
|
||||
pxy.realPort = realPort
|
||||
pxy.listeners = append(pxy.listeners, l)
|
||||
xl.Info("tcp proxy listen port [%d] in group [%s]", pxy.cfg.RemotePort, pxy.cfg.Group)
|
||||
} else {
|
||||
pxy.realBindPort, err = pxy.rc.TCPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
|
||||
pxy.realPort, err = pxy.rc.TCPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
pxy.rc.TCPPortManager.Release(pxy.realBindPort)
|
||||
pxy.rc.TCPPortManager.Release(pxy.realPort)
|
||||
}
|
||||
}()
|
||||
listener, errRet := net.Listen("tcp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realBindPort)))
|
||||
listener, errRet := net.Listen("tcp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realPort)))
|
||||
if errRet != nil {
|
||||
err = errRet
|
||||
return
|
||||
@ -81,9 +66,9 @@ func (pxy *TCPProxy) Run() (remoteAddr string, err error) {
|
||||
xl.Info("tcp proxy listen port [%d]", pxy.cfg.RemotePort)
|
||||
}
|
||||
|
||||
pxy.cfg.RemotePort = pxy.realBindPort
|
||||
remoteAddr = fmt.Sprintf(":%d", pxy.realBindPort)
|
||||
pxy.startCommonTCPListenersHandler()
|
||||
pxy.cfg.RemotePort = pxy.realPort
|
||||
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
|
||||
pxy.startListenHandler(pxy, HandleUserTCPConnection)
|
||||
return
|
||||
}
|
||||
|
||||
@ -91,9 +76,13 @@ func (pxy *TCPProxy) GetConf() config.ProxyConf {
|
||||
return pxy.cfg
|
||||
}
|
||||
|
||||
func (pxy *TCPProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *TCPProxy) Close() {
|
||||
pxy.BaseProxy.Close()
|
||||
if pxy.cfg.Group == "" {
|
||||
pxy.rc.TCPPortManager.Release(pxy.realBindPort)
|
||||
pxy.rc.TCPPortManager.Release(pxy.realPort)
|
||||
}
|
||||
}
|
||||
|
@ -17,35 +17,21 @@ package proxy
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/frp/pkg/config"
|
||||
"github.com/fatedier/frp/pkg/consts"
|
||||
"github.com/fatedier/frp/pkg/util/util"
|
||||
"github.com/fatedier/frp/pkg/util/vhost"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterProxyFactory(reflect.TypeOf(&config.TCPMuxProxyConf{}), NewTCPMuxProxy)
|
||||
}
|
||||
|
||||
type TCPMuxProxy struct {
|
||||
*BaseProxy
|
||||
cfg *config.TCPMuxProxyConf
|
||||
}
|
||||
|
||||
func NewTCPMuxProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
|
||||
unwrapped, ok := cfg.(*config.TCPMuxProxyConf)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return &TCPMuxProxy{
|
||||
BaseProxy: baseProxy,
|
||||
cfg: unwrapped,
|
||||
}
|
||||
}
|
||||
|
||||
func (pxy *TCPMuxProxy) httpConnectListen(
|
||||
domain, routeByHTTPUser, httpUser, httpPwd string, addrs []string) ([]string, error,
|
||||
) {
|
||||
@ -92,7 +78,7 @@ func (pxy *TCPMuxProxy) httpConnectRun() (remoteAddr string, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
pxy.startCommonTCPListenersHandler()
|
||||
pxy.startListenHandler(pxy, HandleUserTCPConnection)
|
||||
remoteAddr = strings.Join(addrs, ",")
|
||||
return remoteAddr, err
|
||||
}
|
||||
@ -115,6 +101,10 @@ func (pxy *TCPMuxProxy) GetConf() config.ProxyConf {
|
||||
return pxy.cfg
|
||||
}
|
||||
|
||||
func (pxy *TCPMuxProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *TCPMuxProxy) Close() {
|
||||
pxy.BaseProxy.Close()
|
||||
}
|
||||
|
@ -19,12 +19,12 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/fatedier/golib/errors"
|
||||
libio "github.com/fatedier/golib/io"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/frp/pkg/config"
|
||||
"github.com/fatedier/frp/pkg/msg"
|
||||
@ -34,15 +34,11 @@ import (
|
||||
"github.com/fatedier/frp/server/metrics"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterProxyFactory(reflect.TypeOf(&config.UDPProxyConf{}), NewUDPProxy)
|
||||
}
|
||||
|
||||
type UDPProxy struct {
|
||||
*BaseProxy
|
||||
cfg *config.UDPProxyConf
|
||||
|
||||
realBindPort int
|
||||
realPort int
|
||||
|
||||
// udpConn is the listener of udp packages
|
||||
udpConn *net.UDPConn
|
||||
@ -63,33 +59,21 @@ type UDPProxy struct {
|
||||
isClosed bool
|
||||
}
|
||||
|
||||
func NewUDPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
|
||||
unwrapped, ok := cfg.(*config.UDPProxyConf)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
baseProxy.usedPortsNum = 1
|
||||
return &UDPProxy{
|
||||
BaseProxy: baseProxy,
|
||||
cfg: unwrapped,
|
||||
}
|
||||
}
|
||||
|
||||
func (pxy *UDPProxy) Run() (remoteAddr string, err error) {
|
||||
xl := pxy.xl
|
||||
pxy.realBindPort, err = pxy.rc.UDPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
|
||||
pxy.realPort, err = pxy.rc.UDPPortManager.Acquire(pxy.name, pxy.cfg.RemotePort)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("acquire port %d error: %v", pxy.cfg.RemotePort, err)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
pxy.rc.UDPPortManager.Release(pxy.realBindPort)
|
||||
pxy.rc.UDPPortManager.Release(pxy.realPort)
|
||||
}
|
||||
}()
|
||||
|
||||
remoteAddr = fmt.Sprintf(":%d", pxy.realBindPort)
|
||||
pxy.cfg.RemotePort = pxy.realBindPort
|
||||
addr, errRet := net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realBindPort)))
|
||||
remoteAddr = fmt.Sprintf(":%d", pxy.realPort)
|
||||
pxy.cfg.RemotePort = pxy.realPort
|
||||
addr, errRet := net.ResolveUDPAddr("udp", net.JoinHostPort(pxy.serverCfg.ProxyBindAddr, strconv.Itoa(pxy.realPort)))
|
||||
if errRet != nil {
|
||||
err = errRet
|
||||
return
|
||||
@ -249,6 +233,10 @@ func (pxy *UDPProxy) GetConf() config.ProxyConf {
|
||||
return pxy.cfg
|
||||
}
|
||||
|
||||
func (pxy *UDPProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *UDPProxy) Close() {
|
||||
pxy.mu.Lock()
|
||||
defer pxy.mu.Unlock()
|
||||
@ -266,5 +254,5 @@ func (pxy *UDPProxy) Close() {
|
||||
close(pxy.readCh)
|
||||
close(pxy.sendCh)
|
||||
}
|
||||
pxy.rc.UDPPortManager.Release(pxy.realBindPort)
|
||||
pxy.rc.UDPPortManager.Release(pxy.realPort)
|
||||
}
|
||||
|
@ -16,18 +16,14 @@ package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/fatedier/golib/errors"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/fatedier/frp/pkg/config"
|
||||
"github.com/fatedier/frp/pkg/msg"
|
||||
)
|
||||
|
||||
func init() {
|
||||
RegisterProxyFactory(reflect.TypeOf(&config.XTCPProxyConf{}), NewXTCPProxy)
|
||||
}
|
||||
|
||||
type XTCPProxy struct {
|
||||
*BaseProxy
|
||||
cfg *config.XTCPProxyConf
|
||||
@ -35,17 +31,6 @@ type XTCPProxy struct {
|
||||
closeCh chan struct{}
|
||||
}
|
||||
|
||||
func NewXTCPProxy(baseProxy *BaseProxy, cfg config.ProxyConf) Proxy {
|
||||
unwrapped, ok := cfg.(*config.XTCPProxyConf)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return &XTCPProxy{
|
||||
BaseProxy: baseProxy,
|
||||
cfg: unwrapped,
|
||||
}
|
||||
}
|
||||
|
||||
func (pxy *XTCPProxy) Run() (remoteAddr string, err error) {
|
||||
xl := pxy.xl
|
||||
|
||||
@ -58,10 +43,7 @@ func (pxy *XTCPProxy) Run() (remoteAddr string, err error) {
|
||||
if len(allowUsers) == 0 {
|
||||
allowUsers = []string{pxy.GetUserInfo().User}
|
||||
}
|
||||
sidCh, err := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk, allowUsers)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
sidCh := pxy.rc.NatHoleController.ListenClient(pxy.GetName(), pxy.cfg.Sk, allowUsers)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
@ -90,6 +72,10 @@ func (pxy *XTCPProxy) GetConf() config.ProxyConf {
|
||||
return pxy.cfg
|
||||
}
|
||||
|
||||
func (pxy *XTCPProxy) GetLimiter() *rate.Limiter {
|
||||
return pxy.limiter
|
||||
}
|
||||
|
||||
func (pxy *XTCPProxy) Close() {
|
||||
pxy.BaseProxy.Close()
|
||||
pxy.rc.NatHoleController.CloseClient(pxy.GetName())
|
||||
|
@ -37,7 +37,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
[tcp-port-not-allowed]
|
||||
type = tcp
|
||||
local_port = {{ .%s }}
|
||||
remote_port = 25001
|
||||
remote_port = 20001
|
||||
`, framework.TCPEchoServerPort)
|
||||
clientConf += fmt.Sprintf(`
|
||||
[tcp-port-unavailable]
|
||||
@ -55,7 +55,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
[udp-port-not-allowed]
|
||||
type = udp
|
||||
local_port = {{ .%s }}
|
||||
remote_port = 25003
|
||||
remote_port = 20003
|
||||
`, framework.UDPEchoServerPort)
|
||||
|
||||
f.RunProcesses([]string{serverConf}, []string{clientConf})
|
||||
@ -65,7 +65,7 @@ var _ = ginkgo.Describe("[Feature: Server Manager]", func() {
|
||||
framework.NewRequestExpect(f).PortName(tcpPortName).Ensure()
|
||||
|
||||
// Not Allowed
|
||||
framework.NewRequestExpect(f).Port(25001).ExpectError(true).Ensure()
|
||||
framework.NewRequestExpect(f).Port(25003).ExpectError(true).Ensure()
|
||||
|
||||
// Unavailable, already bind by frps
|
||||
framework.NewRequestExpect(f).PortName(consts.PortServerName).ExpectError(true).Ensure()
|
||||
|
Loading…
x
Reference in New Issue
Block a user