package compapi import ( "context" "errors" "fmt" "net/http" "reflect" "sync" "wechat-api/internal/types" "wechat-api/internal/utils/contextkey" "github.com/invopop/jsonschema" "github.com/openai/openai-go" "github.com/openai/openai-go/option" "github.com/openai/openai-go/packages/ssestream" "github.com/zeromicro/go-zero/rest/httpx" ) type ClientConfig struct { ApiKey string ApiBase string } type ClientOption func(*ClientConfig) func WithApiKey(ApiKey string) ClientOption { return func(cfg *ClientConfig) { cfg.ApiKey = ApiKey } } func WithApiBase(ApiBase string) ClientOption { return func(cfg *ClientConfig) { cfg.ApiBase = ApiBase } } type clientActionFace interface { DoRequest(req *types.CompApiReq) (*types.CompOpenApiResp, error) DoRequestStream(req *types.CompApiReq) (*types.CompOpenApiResp, error) BuildRequest(req *types.CompApiReq) error CallbackPrepare(params any) ([]byte, error) } type Client struct { OAC *openai.Client Config ClientConfig ctx context.Context } func NewClient(ctx context.Context, opts ...ClientOption) *Client { client := Client{} for _, opt := range opts { opt(&client.Config) } client.NewOAC() //初始化openai client client.ctx = ctx return &client } // 以下新增加client类型工厂自注册相关函数 // --- Registry Func Type--- type ClientBuilderFunc func(c *Client) (clientActionFace, error) var ( clientRegistry = make(map[string]ClientBuilderFunc) defaultClientType string defaultClientBuilder ClientBuilderFunc registryMutex sync.RWMutex // Protects both registry and default builder ) // RegisterClient remains the same func RegisterClient(clientType string, builder ClientBuilderFunc) error { registryMutex.Lock() defer registryMutex.Unlock() if _, exists := clientRegistry[clientType]; exists { return fmt.Errorf("client type '%s' already registered", clientType) } clientRegistry[clientType] = builder return nil } // RegisterDefaultClient registers a builder function as the default fallback. // Typically called from the init() function of the default client implementation. func RegisterDefaultClient(clientType string, builder ClientBuilderFunc) error { registryMutex.Lock() defer registryMutex.Unlock() if defaultClientBuilder != nil { // Prevent multiple defaults or decide on override behavior return fmt.Errorf("default client type '%s' already registered, cannot register '%s' as default", defaultClientType, clientType) } defaultClientBuilder = builder defaultClientType = clientType // Store the name return nil } // GetClientBuilder remains the same func GetClientBuilder(clientType string) (ClientBuilderFunc, bool) { registryMutex.RLock() defer registryMutex.RUnlock() builder, exists := clientRegistry[clientType] return builder, exists } // GetDefaultClientBuilder retrieves the registered default builder. func GetDefaultClientBuilder() (ClientBuilderFunc, bool) { registryMutex.RLock() defer registryMutex.RUnlock() exists := defaultClientBuilder != nil return defaultClientBuilder, exists } // 根据client生成不同实现了clientActionFace接口的client func (me *Client) GetClientActFace(clientType string) (clientActionFace, error) { // 1. Try to find the specific client type builder, exists := GetClientBuilder(clientType) if exists { return builder(me) } // 2. If not found, try to get the default client builder defaultBuilder, defaultExists := GetDefaultClientBuilder() if defaultExists { return defaultBuilder(me) } // 3. If neither specific nor default exists, return an error return nil, fmt.Errorf("unsupported client type '%s' and no default client registered", clientType) } func (me *Client) NewOAC() { opts := []option.RequestOption{} if len(me.Config.ApiKey) > 0 { opts = append(opts, option.WithAPIKey(me.Config.ApiKey)) } if len(me.Config.ApiBase) > 0 { opts = append(opts, option.WithBaseURL(me.Config.ApiBase)) } oac := openai.NewClient(opts...) me.OAC = &oac } func (me *Client) Callback(clientType string, callbackUrl string, params any) (map[string]any, error) { actFace, err := me.GetClientActFace(clientType) if err != nil { return nil, err } //临时测试 //humanSeeActFaceMake(actFace, clientType, "Callback") var newParams []byte if newParams, err = actFace.CallbackPrepare(params); err != nil { return nil, err } //Post(ctx context.Context, path string, params interface{}, res interface{}, opts ...option.RequestOption) resp := map[string]any{} err = me.OAC.Post(me.ctx, callbackUrl, newParams, &resp) if err != nil { fmt.Printf("Callback Post(%s) By Params:'%s' error\n", callbackUrl, string(newParams)) return nil, err } return resp, nil } func (me *Client) Chat(chatInfo *types.CompApiReq) (*types.CompOpenApiResp, error) { var ( err error actFace clientActionFace apiResp *types.CompOpenApiResp ) actFace, err = me.GetClientActFace(chatInfo.EventType) if err != nil { return nil, err } //临时测试 //humanSeeActFaceMake(actFace, chatInfo.EventType, "Chat") err = actFace.BuildRequest(chatInfo) if err != nil { return nil, err } if chatInfo.Stream { apiResp, err = actFace.DoRequestStream(chatInfo) } else { apiResp, err = actFace.DoRequest(chatInfo) } return apiResp, err } func GenerateSchema[T any]() any { // Structured Outputs uses a subset of JSON schema // These flags are necessary to comply with the subset reflector := jsonschema.Reflector{ AllowAdditionalProperties: false, DoNotReference: true, } var v T schema := reflector.Reflect(v) return schema } func GenerateSchemaFromValue(value any) (any, error) { if value == nil { // 处理 nil 值的情况,根据你的需求决定是返回错误还是空 schema return nil, fmt.Errorf("cannot generate schema from a nil value") } reflector := jsonschema.Reflector{ AllowAdditionalProperties: false, // 根据你的需求设置 DoNotReference: true, // 根据你的需求设置 } // 直接对传入的值进行反射 schema := reflector.Reflect(value) return schema, nil } // 新函数:通过 reflect.Type 生成 Schema func GenerateSchemaByType(t reflect.Type) any { reflector := jsonschema.Reflector{ AllowAdditionalProperties: false, DoNotReference: true, } schema := reflector.ReflectFromType(t) return schema } func getHttpResponseTools(ctx context.Context) (*http.ResponseWriter, *http.Flusher, error) { hw, ok := contextkey.HttpResponseWriterKey.GetValue(ctx) //context取出http.ResponseWriter if !ok { return nil, nil, errors.New("content get http writer err") } flusher, ok := (hw).(http.Flusher) if !ok { return nil, nil, errors.New("streaming unsupported") } return &hw, &flusher, nil } func streamOut(ctx context.Context, res *http.Response) { var ehw http.ResponseWriter hw, flusher, err := getHttpResponseTools(ctx) if err != nil { http.Error(ehw, "Streaming unsupported!", http.StatusInternalServerError) } //获取API返回结果流 chatStream := ssestream.NewStream[ApiRespStreamChunk](ApiRespStreamDecoder(res), err) defer chatStream.Close() //设置流式输出头 http1.1 (*hw).Header().Set("Content-Type", "text/event-stream;charset=utf-8") (*hw).Header().Set("Connection", "keep-alive") (*hw).Header().Set("Cache-Control", "no-cache") for chatStream.Next() { chunk := chatStream.Current() fmt.Fprintf((*hw), "data:%s\n\n", chunk.Data.RAW) (*flusher).Flush() //time.Sleep(1 * time.Millisecond) } fmt.Fprintf((*hw), "data:%s\n\n", "[DONE]") (*flusher).Flush() httpx.Ok((*hw)) } func humanSeeActFaceMake(actFace clientActionFace, clientType string, funcName string) { clientName := "" switch actFace.(type) { case *MismatchClient: clientName = "MismatchClient" case *IntentClient: clientName = "IntentClient" case *FastgptClient: clientName = "FastgptClient" default: clientName = "maybe StdClient" } fmt.Printf("%s.%s() for EventType:'%s'\n", clientName, funcName, clientType) }