diff --git a/ai-convert/message.go b/ai-convert/message.go index 9e48ebf2..66263d06 100644 --- a/ai-convert/message.go +++ b/ai-convert/message.go @@ -47,14 +47,6 @@ type ModelConfig struct { TopLogProbs int `json:"top_logprobs,omitempty"` } -// Message represents a single message in the conversation. -type Message struct { - // Role indicates the role of the message sender (e.g., "system", "user", "assistant"). - Role string `json:"role"` - // Content contains the actual text of the message. - Content string `json:"content"` -} - type Error struct { Message string `json:"message"` Type string `json:"type"` diff --git a/drivers/app/extend-param.go b/drivers/app/extend-param.go index 7e728c38..bb3c50d4 100644 --- a/drivers/app/extend-param.go +++ b/drivers/app/extend-param.go @@ -27,6 +27,9 @@ func newAdditionalParam(params []*Additional) *additionalParam { } func (a *additionalParam) Execute(ctx http_service.IHttpContext) error { + if len(a.params) < 1 { + return nil + } contentType, _, _ := mime.ParseMediaType(ctx.Proxy().Body().ContentType()) bodyParams, formParams, err := parseBodyParams(ctx) if err != nil { diff --git a/drivers/output/kafka/config.go b/drivers/output/kafka/config.go index b44735f2..933f12fe 100644 --- a/drivers/output/kafka/config.go +++ b/drivers/output/kafka/config.go @@ -5,7 +5,7 @@ import ( "strings" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" "github.com/eolinker/eosc" ) @@ -114,8 +114,10 @@ func (c *Config) doCheck() (*ProducerConfig, error) { } // 只监听错误 s.Producer.Return.Errors = true - s.Producer.Return.Successes = false + s.Producer.Return.Successes = true s.Producer.RequiredAcks = sarama.WaitForLocal + s.Producer.Compression = sarama.CompressionGZIP + s.Producer.CompressionLevel = sarama.CompressionLevelDefault p.Address = strings.Split(conf.Address, ",") if len(p.Address) == 0 { diff --git a/drivers/output/kafka/driver.go b/drivers/output/kafka/driver.go index e7bd769b..9c52fde9 100644 --- a/drivers/output/kafka/driver.go +++ b/drivers/output/kafka/driver.go @@ -13,7 +13,6 @@ type Driver struct { func Check(v *Config, workers map[eosc.RequireId]eosc.IWorker) error { _, err := v.doCheck() - return err } diff --git a/drivers/output/kafka/kafka_test.go b/drivers/output/kafka/kafka_test.go index 341820d6..95dc759d 100644 --- a/drivers/output/kafka/kafka_test.go +++ b/drivers/output/kafka/kafka_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" "github.com/eolinker/eosc/log" ) diff --git a/drivers/output/kafka/producer.go b/drivers/output/kafka/producer.go index 961b13ea..feef5643 100644 --- a/drivers/output/kafka/producer.go +++ b/drivers/output/kafka/producer.go @@ -5,7 +5,7 @@ import ( "encoding/json" "sync" - "github.com/Shopify/sarama" + "github.com/IBM/sarama" "github.com/eolinker/eosc" "github.com/eolinker/eosc/formatter" "github.com/eolinker/eosc/log" @@ -69,7 +69,7 @@ func (o *tProducer) close() { o.cancel() o.cancel = nil } - o.producer.AsyncClose() + o.producer.Close() o.producer = nil o.formatter = nil } @@ -94,7 +94,6 @@ func (o *tProducer) output(entry eosc.IEntry) error { } log.DebugF("kafka send addr: %s, topic: %s, data: %s", o.conf.Address, o.conf.Topic, data) o.write(msg) - return nil } @@ -122,15 +121,24 @@ func (o *tProducer) work(producer sarama.AsyncProducer, ctx context.Context, cfg case err := <-producer.Errors(): log.DebugF("receive error.kafka addr: %s,kafka topic: %s,kafka partition: %d", cfg.Address, cfg.Topic, cfg.Partition) if err != nil { - log.Warnf("kafka error:%s", err.Error()) + log.Errorf("kafka error:%s", err.Error()) } case success, ok := <-producer.Successes(): - log.DebugF("receive success.kafka addr: %s,kafka topic: %s,kafka partition: %d", cfg.Address, cfg.Topic, cfg.Partition) if !ok { - log.Errorf("kafka producer closed") return } - log.DebugF("kafka success:%s", success) + log.DebugF("Message sent to partition %d at offset %d\n", success.Partition, success.Offset) + //key, err := success.Key.Encode() + //if err != nil { + // log.Errorf("kafka error:%s", err.Error()) + // continue + //} + //value, err := success.Value.Encode() + //if err != nil { + // log.Errorf("kafka error:%s", err.Error()) + // continue + //} + //log.DebugF("kafka success addr: %s, topic: %s, partition: %d, key: %s, value: %s", cfg.Address, cfg.Topic, success.Partition, string(key), string(value)) } } } diff --git a/drivers/plugins/ai-prompt/executor.go b/drivers/plugins/ai-prompt/executor.go index 944059a1..65981949 100644 --- a/drivers/plugins/ai-prompt/executor.go +++ b/drivers/plugins/ai-prompt/executor.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/sashabaranov/go-openai" "strings" ai_convert "github.com/eolinker/apinto/ai-convert" @@ -15,15 +16,15 @@ import ( ) type RequestMessage struct { - Model string `json:"model"` - Messages []Message `json:"messages"` - Variables map[string]string `json:"variables,omitempty"` + Model string `json:"model"` + Messages []openai.ChatCompletionMessage `json:"messages"` + Variables map[string]string `json:"variables,omitempty"` } -type Message struct { - Role string `json:"role"` - Content string `json:"content"` -} +//type Message struct { +// Role string `json:"role"` +// Content string `json:"content"` +//} type executor struct { drivers.WorkerBase @@ -112,7 +113,7 @@ func genRequestMessage(ctx http_context.IHttpContext, body []byte, prompt string } prompt = strings.Replace(prompt, fmt.Sprintf("{{%s}}", k), baseMsg.Config.Variables[k], -1) } - messages := []Message{ + messages := []openai.ChatCompletionMessage{ { Role: "system", Content: prompt, diff --git a/example/dubbo2/client/flag.go b/example/dubbo2/client/flag.go index bdf564b2..b7b1daea 100644 --- a/example/dubbo2/client/flag.go +++ b/example/dubbo2/client/flag.go @@ -3,10 +3,10 @@ package main import "flag" var ( - address = "127.0.0.1:8099" + address = "127.0.0.1:20000" ) func init() { - flag.StringVar(&address, "addr", "172.28.187.118:8099", "The address to connect dubbo2 server.") + flag.StringVar(&address, "addr", "127.0.0.1:20000", "The address to connect dubbo2 server.") flag.Parse() } diff --git a/example/dubbo2/client/main.go b/example/dubbo2/client/main.go index f970c5fc..a08add65 100644 --- a/example/dubbo2/client/main.go +++ b/example/dubbo2/client/main.go @@ -21,19 +21,19 @@ func init() { func client(addr string, serviceName, methodName string, timeout time.Duration, typesList []string, valuesList []hessian.Object) (interface{}, error) { arguments := make([]interface{}, 3) parameterValues := make([]reflect.Value, 3) - + arguments[0] = methodName arguments[1] = typesList arguments[2] = valuesList - + parameterValues[0] = reflect.ValueOf(arguments[0]) parameterValues[1] = reflect.ValueOf(arguments[1]) parameterValues[2] = reflect.ValueOf(arguments[2]) - + invoc := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("$invoke"), invocation.WithArguments(arguments), invocation.WithParameterValues(parameterValues)) - + url, err := common.NewURL(addr, common.WithProtocol(dubbo.DUBBO), common.WithParamsValue(constant.SerializationKey, constant.Hessian2Serialization), //common.WithParamsValue(constant.GenericFilterKey, "true"), @@ -47,16 +47,16 @@ func client(addr string, serviceName, methodName string, timeout time.Duration, return nil, err } dubboProtocol := dubbo.NewDubboProtocol() - + invoker := dubboProtocol.Refer(url) var resp interface{} invoc.SetReply(&resp) - + result := invoker.Invoke(context.Background(), invoc) if result.Error() != nil { return nil, result.Error() } - + return result.Result(), nil } @@ -71,22 +71,21 @@ func main() { func ComplexServer() { var types []string var valuesList []hessian.Object - + types = append(types, "object") - + server := map[string]interface{}{"id": 16, "name": "apinto", "email": "1324204490@qq.com"} - + valuesList = append(valuesList, map[string]interface{}{"time": time.Now(), "addr": "192.168.0.1", "server": server}) - resp, err := client(address, "api.Server", "ComplexServer", time.Second*3, types, valuesList) - + if err != nil { logger.Errorf("ComplexServer err=%s", err.Error()) return } v := resp.(*interface{}) vvv := formatData(*v) - + bytes, _ := json.Marshal(vvv) logger.Infof("ComplexServer result=%s", string(bytes)) } @@ -94,21 +93,21 @@ func ComplexServer() { func UpdateList() { var types []string var valuesList []hessian.Object - + types = append(types, "object") val1 := map[string]interface{}{"id": 16, "name": "apinto", "email": "1324204490@qq.com"} val2 := map[string]interface{}{"id": 16, "name": "apinto", "email": "1324204490@qq.com"} valuesList = append(valuesList, []interface{}{val1, val2}) - + resp, err := client(address, "api.Server", "UpdateList", time.Second*3, types, valuesList) - + if err != nil { logger.Errorf("UpdateList err=%s", err.Error()) return } v := resp.(*interface{}) vvv := formatData(*v) - + bytes, _ := json.Marshal(vvv) logger.Infof("UpdateList result=%s", string(bytes)) } @@ -116,66 +115,66 @@ func UpdateList() { func Update() { var types []string var valuesList []hessian.Object - + types = append(types, "object") valuesList = append(valuesList, map[string]interface{}{"id": 16, "name": "apinto", "email": "1324204490@qq.com"}) resp, err := client(address, "api.Server", "Update", time.Second*3, types, valuesList) - + if err != nil { logger.Errorf("Update err=%s", err.Error()) return } v := resp.(*interface{}) vvv := formatData(*v) - + bytes, _ := json.Marshal(vvv) - + logger.Infof("Update result=%s", string(bytes)) } func List() { var types []string var valuesList []hessian.Object - + types = append(types, "object") valuesList = append(valuesList, map[string]interface{}{"id": 16, "name": "apinto", "email": "1324204490@qq.com"}) resp, err := client(address, "api.Server", "List", time.Second*3, types, valuesList) - + if err != nil { logger.Errorf("List err=%s", err.Error()) return } v := resp.(*interface{}) vvv := formatData(*v) - + bytes, _ := json.Marshal(vvv) - + logger.Infof("List result=%s", string(bytes)) } func GetById(id int64) { types := make([]string, 0) valuesList := make([]hessian.Object, 0) - + types = append(types, "int64") valuesList = append(valuesList, id) - + resp, err := client(address, "api.Server", "GetById", time.Second*3, types, valuesList) - + if err != nil { logger.Errorf("List err=%s", err.Error()) return } v := resp.(*interface{}) vvv := formatData(*v) - + bytes, _ := json.Marshal(vvv) - + logger.Infof("GetById result=%s", string(bytes)) } func formatData(value interface{}) interface{} { - + switch valueTemp := value.(type) { case map[interface{}]interface{}: maps := make(map[string]interface{}) @@ -185,7 +184,7 @@ func formatData(value interface{}) interface{} { return maps case []interface{}: values := make([]interface{}, 0) - + for _, v := range valueTemp { values = append(values, formatData(v)) } diff --git a/example/dubbo2/model/server.go b/example/dubbo2/model/server.go index a5101c4a..201e06d7 100644 --- a/example/dubbo2/model/server.go +++ b/example/dubbo2/model/server.go @@ -1,7 +1,5 @@ package model -import "time" - type Server struct { Id int64 Name string @@ -10,7 +8,7 @@ type Server struct { } type ComplexServer struct { - Addr string - Time time.Time + Addr string + //Time time.Time Server Server } diff --git a/go.mod b/go.mod index 7bd02e9f..ea9e8281 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,11 @@ module github.com/eolinker/apinto -go 1.23 +go 1.23.0 -toolchain go1.23.1 +toolchain go1.23.6 require ( - github.com/Shopify/sarama v1.32.0 + github.com/IBM/sarama v1.45.2 github.com/aws/aws-sdk-go v1.27.0 github.com/brianvoe/gofakeit/v6 v6.20.1 github.com/clbanning/mxj v1.8.4 @@ -29,14 +29,14 @@ require ( github.com/polarismesh/polaris-go v1.1.0 github.com/redis/go-redis/v9 v9.7.0 github.com/robertkrimen/otto v0.0.0-20211024170158-b87d35c0b86f - github.com/sashabaranov/go-openai v1.38.0 + github.com/sashabaranov/go-openai v1.40.5 github.com/soheilhy/cmux v0.1.5 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/traefik/yaegi v0.16.1 github.com/urfave/cli/v2 v2.23.4 github.com/valyala/fasthttp v1.59.0 - golang.org/x/crypto v0.33.0 - golang.org/x/net v0.35.0 + golang.org/x/crypto v0.38.0 + golang.org/x/net v0.40.0 golang.org/x/oauth2 v0.14.0 google.golang.org/api v0.149.0 google.golang.org/grpc v1.61.0 @@ -98,6 +98,7 @@ require ( github.com/natefinch/lumberjack v2.0.0+incompatible // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml v1.7.0 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/statsd_exporter v0.21.0 // indirect @@ -111,7 +112,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel/metric v1.20.0 // indirect - golang.org/x/sync v0.11.0 // indirect + golang.org/x/sync v0.14.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231106174013-bbf56f31fb17 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect @@ -134,8 +135,8 @@ require ( github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/eapache/go-resiliency v1.2.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/fatih/color v1.13.0 // indirect github.com/ghodss/yaml v1.0.0 // indirect @@ -149,19 +150,19 @@ require ( github.com/hashicorp/go-hclog v0.16.2 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect - github.com/hashicorp/go-uuid v1.0.2 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/serf v0.9.5 // indirect github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect - github.com/jcmturner/gofork v1.0.0 // indirect - github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect - github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/kr/fs v0.1.0 // indirect github.com/mattn/go-colorable v0.1.9 // indirect github.com/mattn/go-isatty v0.0.14 // indirect @@ -170,7 +171,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.2 github.com/prometheus/client_model v0.2.0 // indirect @@ -196,8 +196,8 @@ require ( go.uber.org/atomic v1.9.0 go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.23.0 - golang.org/x/sys v0.30.0 // indirect - golang.org/x/text v0.22.0 + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.25.0 golang.org/x/time v0.1.0 // indirect google.golang.org/genproto v0.0.0-20231106174013-bbf56f31fb17 // indirect gopkg.in/sourcemap.v1 v1.0.5 // indirect diff --git a/utils/globalID/uuid.go b/utils/globalID/uuid.go deleted file mode 100644 index 70c891c4..00000000 --- a/utils/globalID/uuid.go +++ /dev/null @@ -1,42 +0,0 @@ -package globalID - -import ( - "encoding/binary" - "encoding/hex" - "strings" - "sync" - "time" -) - -var ( - locker = sync.Mutex{} - lastID int64 - - start int64 -) - -func init() { - s, _ := time.Parse("2020-01-02T15:04:05Z07:00", time.RFC3339) - start = s.UnixNano() -} - -//GenerateID 生成id -func GenerateID() int64 { - id := time.Now().UnixNano() - start - locker.Lock() - defer locker.Unlock() - - for id <= lastID { - id++ - } - lastID = id - return id -} - -//GenerateIDString 生成id字符串 -func GenerateIDString() string { - id := GenerateID() - data := make([]byte, 8) - binary.BigEndian.PutUint64(data, uint64(id)) - return strings.ToUpper(hex.EncodeToString(data)) -} diff --git a/utils/globalID/uuid_test.go b/utils/globalID/uuid_test.go deleted file mode 100644 index a730d32e..00000000 --- a/utils/globalID/uuid_test.go +++ /dev/null @@ -1,26 +0,0 @@ -package globalID - -import ( - "fmt" - "testing" -) - -func TestGenerateIDString(t *testing.T) { - - tests := []struct { - }{ - {}, - {}, - {}, - {}, - {}, - {}, - } - for i := range tests { - t.Run(fmt.Sprintf("GenerateIDString-%d", i), func(t *testing.T) { - t.Logf("GenerateID() = %0X", GenerateID()) - d := GenerateIDString() - t.Logf("GenerateIDString() = %s[%d]", d, len(d)) - }) - } -}