diff --git a/harmony/harmonyparser_test.go b/harmony/harmonyparser_test.go index b988a018f..2c8667bd4 100644 --- a/harmony/harmonyparser_test.go +++ b/harmony/harmonyparser_test.go @@ -1,8 +1,10 @@ package harmony import ( + "encoding/json" "fmt" "reflect" + "strings" "testing" ) @@ -535,3 +537,224 @@ func TestFunctionConvertAndAdd(t *testing.T) { }) } } + +func TestHarmonyMessageHandlerStreamingScenarios(t *testing.T) { + t.Run("thinking_then_content_streams", func(t *testing.T) { + handler := NewHarmonyMessageHandler() + handler.HarmonyParser.AddImplicitStart() + tp := handler.CreateToolParser() + type step struct { + in string + wantContent string + wantThinking string + } + steps := []step{ + {in: "<|channel|>analysis<|message|>Thinking...", wantThinking: "Thinking..."}, + {in: "<|end|>", wantThinking: ""}, + {in: "<|start|>assistant<|message|>Answer", wantContent: "Answer"}, + {in: "<|end|>", wantContent: ""}, + } + for i, s := range steps { + content, thinking, tool := handler.AddContent(s.in, tp) + if tool != "" { + tp.Add(tool) + } + if content != s.wantContent || thinking != s.wantThinking { + t.Fatalf("step %d: got (content=%q thinking=%q), want (content=%q thinking=%q)", i, content, thinking, s.wantContent, s.wantThinking) + } + } + }) + + t.Run("content_streams_as_it_arrives", func(t *testing.T) { + handler := NewHarmonyMessageHandler() + handler.HarmonyParser.AddImplicitStart() + tp := handler.CreateToolParser() + inputs := []string{ + "<|start|>assistant<|message|>Hello", + ", world", + "!<|end|>", + } + var got []string + for _, in := range inputs { + content, thinking, tool := handler.AddContent(in, tp) + if tool != "" { + tp.Add(tool) + } + if thinking != "" { + t.Fatalf("unexpected thinking %q", thinking) + } + if content != "" { + got = append(got, content) + } + } + want := []string{"Hello", ", world", "!"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("content pieces mismatch: got %v want %v", got, want) + } + }) + + t.Run("thinking_streams_separately_from_content", func(t *testing.T) { + handler := NewHarmonyMessageHandler() + handler.HarmonyParser.AddImplicitStart() + tp := handler.CreateToolParser() + inputs := []string{ + "<|channel|>analysis<|message|>Thinking...", + "<|end|>", + "<|start|>assistant<|message|>Answer", + "<|end|>", + } + var got []string + for _, in := range inputs { + content, thinking, tool := handler.AddContent(in, tp) + if tool != "" { + tp.Add(tool) + } + if thinking != "" { + got = append(got, thinking) + } + if content != "" { + got = append(got, content) + } + } + want := []string{"Thinking...", "Answer"} + if !reflect.DeepEqual(got, want) { + t.Fatalf("content pieces mismatch: got %v want %v", got, want) + } + }) + + t.Run("partial_tags_buffer_until_complete", func(t *testing.T) { + handler := NewHarmonyMessageHandler() + handler.HarmonyParser.AddImplicitStart() + tp := handler.CreateToolParser() + inputs := []string{ + "<|chan", + "nel|>analysis<|mess", + "age|>Deep ", + "thought", + "<|end|>", + "<|start|>assistant<|message|>Done", + "<|end|>", + } + var thinkingPieces []string + var contentPieces []string + for _, in := range inputs { + content, thinking, tool := handler.AddContent(in, tp) + if tool != "" { + tp.Add(tool) + } + if thinking != "" { + thinkingPieces = append(thinkingPieces, thinking) + } + if content != "" { + contentPieces = append(contentPieces, content) + } + } + if want := []string{"Deep ", "thought"}; !reflect.DeepEqual(thinkingPieces, want) { + t.Fatalf("thinking pieces mismatch: got %v want %v", thinkingPieces, want) + } + if want := []string{"Done"}; !reflect.DeepEqual(contentPieces, want) { + t.Fatalf("content pieces mismatch: got %v want %v", contentPieces, want) + } + }) + + t.Run("simple_assistant_after_analysis", func(t *testing.T) { + handler := NewHarmonyMessageHandler() + handler.HarmonyParser.AddImplicitStart() + tp := handler.CreateToolParser() + inputs := []string{ + "<|channel|>analysis<|message|>Think", + "<|end|>", + "<|start|>assistant<|message|>Answer", + "<|end|>", + } + var contentSb, thinkingSb strings.Builder + for _, in := range inputs { + content, thinking, tool := handler.AddContent(in, tp) + if tool != "" { + tp.Add(tool) + } + contentSb.WriteString(content) + thinkingSb.WriteString(thinking) + } + if contentSb.String() != "Answer" { + t.Fatalf("content mismatch: got %q want %q", contentSb.String(), "Answer") + } + if thinkingSb.String() != "Think" { + t.Fatalf("thinking mismatch: got %q want %q", thinkingSb.String(), "Think") + } + }) + + t.Run("tool_call_parsed_and_returned_correctly", func(t *testing.T) { + handler := NewHarmonyMessageHandler() + handler.HarmonyParser.AddImplicitStart() + tp := handler.CreateToolParser() + inputs := []string{ + "<|channel|>commentary to=functions.calculate<|message|>{\"expression\":\"2+2\"}<|end|>", + } + for _, in := range inputs { + content, thinking, tool := handler.AddContent(in, tp) + if content != "" || thinking != "" { + continue + } + if tool != "" { + tp.Add(tool) + } + } + name, args := tp.Drain() + if name == nil || *name != "functions.calculate" { + t.Fatalf("unexpected tool name: %v", name) + } + if got, want := args, "{\"expression\":\"2+2\"}"; got != want { + t.Fatalf("unexpected tool args: got %s want %s", got, want) + } + }) + + t.Run("tool_call_across_chunks", func(t *testing.T) { + handler := NewHarmonyMessageHandler() + handler.HarmonyParser.AddImplicitStart() + tp := handler.CreateToolParser() + inputs := []string{ + "<|channel|>commentary to=functions.calculate<|message|>{\"expression\":\"2+", + "2\"}", + "<|end|>", + } + for _, in := range inputs { + content, thinking, tool := handler.AddContent(in, tp) + if content != "" || thinking != "" { + continue + } + if tool != "" { + tp.Add(tool) + } + } + name, args := tp.Drain() + if name == nil || *name != "functions.calculate" { + t.Fatalf("unexpected tool name: %v", name) + } + if got, want := args, "{\"expression\":\"2+2\"}"; got != want { + t.Fatalf("unexpected tool args: got %s want %s", got, want) + } + }) +} + +func TestFunctionNameMapJSONRoundTrip(t *testing.T) { + m := NewFunctionNameMap() + gotConverted := m.ConvertAndAdd("get weather") + if gotConverted == "" { + t.Fatal("conversion returned empty") + } + b, err := json.Marshal(m) + if err != nil { + t.Fatalf("marshal: %v", err) + } + var m2 FunctionNameMap + if err := json.Unmarshal(b, &m2); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if m2.userToHarmony["get weather"] != gotConverted { + t.Fatalf("userToHarmony lost: got %q want %q", m2.userToHarmony["get weather"], gotConverted) + } + if m2.harmonyToUser[gotConverted] != "get weather" { + t.Fatalf("harmonyToUser lost: got %q want %q", m2.harmonyToUser[gotConverted], "get weather") + } +} diff --git a/server/routes_harmony_streaming_test.go b/server/routes_harmony_streaming_test.go index b1ede4e39..bcb020886 100644 --- a/server/routes_harmony_streaming_test.go +++ b/server/routes_harmony_streaming_test.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "encoding/json" - "net/http" "strings" "testing" "time" @@ -118,7 +117,7 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) { name: "content streams as it arrives", steps: []step{ { - input: llm.CompletionResponse{Content: "<|message|>Hello", Done: false}, + input: llm.CompletionResponse{Content: "Hello", Done: false}, wantContent: "Hello", }, { @@ -126,7 +125,7 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) { wantContent: ", world", }, { - input: llm.CompletionResponse{Content: "!<|end|>", Done: true, DoneReason: llm.DoneReasonStop}, + input: llm.CompletionResponse{Content: "!", Done: true, DoneReason: llm.DoneReasonStop}, wantContent: "!", }, }, @@ -135,20 +134,15 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) { name: "thinking streams separately from content", steps: []step{ { - input: llm.CompletionResponse{Content: "<|channel|>analysis<|message|>Thinking...", Done: false}, + input: llm.CompletionResponse{Thinking: "Thinking...", Done: false}, wantThinking: "Thinking...", }, { - input: llm.CompletionResponse{Content: "<|end|>", Done: false}, - // No output expected - just closes the analysis message and resets state to normal + input: llm.CompletionResponse{Content: "Answer", Done: false}, + wantContent: "Answer", }, { - input: llm.CompletionResponse{Content: "<|start|>assistant<|message|>Answer", Done: false}, - wantContent: "Answer", // After message end, state is reset to normal - }, - { - input: llm.CompletionResponse{Content: "<|end|>", Done: true, DoneReason: llm.DoneReasonStop}, - // No output expected - just closes the assistant message + input: llm.CompletionResponse{Done: true, DoneReason: llm.DoneReasonStop}, }, }, }, @@ -156,24 +150,16 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) { name: "partial tags buffer until complete", steps: []step{ { - input: llm.CompletionResponse{Content: "<|chan", Done: false}, - // No output - partial tag - }, - { - input: llm.CompletionResponse{Content: "nel|>analysis<|mess", Done: false}, - // No output - still building tags - }, - { - input: llm.CompletionResponse{Content: "age|>Deep ", Done: false}, + input: llm.CompletionResponse{Thinking: "Deep ", Done: false}, wantThinking: "Deep ", }, { - input: llm.CompletionResponse{Content: "thought<|end|>", Done: false}, + input: llm.CompletionResponse{Thinking: "thought", Done: false}, wantThinking: "thought", }, { - input: llm.CompletionResponse{Content: "<|start|>assistant<|message|>Done<|end|>", Done: true, DoneReason: llm.DoneReasonStop}, - wantContent: "Done", // After message end, state is reset to normal + input: llm.CompletionResponse{Content: "Done", Done: true, DoneReason: llm.DoneReasonStop}, + wantContent: "Done", }, }, }, @@ -181,7 +167,7 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) { name: "simple assistant after analysis", steps: []step{ { - input: llm.CompletionResponse{Content: "<|channel|>analysis<|message|>Think<|end|><|start|>assistant<|message|>Answer<|end|>", Done: true, DoneReason: llm.DoneReasonStop}, + input: llm.CompletionResponse{Thinking: "Think", Content: "Answer", Done: true, DoneReason: llm.DoneReasonStop}, wantContent: "Answer", wantThinking: "Think", }, @@ -191,7 +177,7 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) { name: "tool call parsed and returned correctly", steps: []step{ { - input: llm.CompletionResponse{Content: "<|channel|>commentary to=functions.get_weather<|message|>{\"location\":\"San Francisco\"}<|end|><|start|>assistant<|message|>The weather is sunny<|end|>", Done: true, DoneReason: llm.DoneReasonStop}, + input: llm.CompletionResponse{Content: "The weather is sunny", ToolCalls: []api.ToolCall{{Function: api.ToolCallFunction{Name: "get_weather", Arguments: api.ToolCallFunctionArguments{"location": "San Francisco"}}}}, Done: true, DoneReason: llm.DoneReasonStop}, wantContent: "The weather is sunny", wantToolCalls: []api.ToolCall{ { @@ -210,15 +196,10 @@ func TestChatHarmonyParserStreamingRealtime(t *testing.T) { name: "tool call with streaming JSON across chunks", steps: []step{ { - input: llm.CompletionResponse{Content: "<|channel|>commentary to=functions.calculate<|message|>{\"expr", Done: false}, - // No output yet - incomplete JSON + input: llm.CompletionResponse{Done: false}, }, { - input: llm.CompletionResponse{Content: "ession\":\"2+", Done: false}, - // Still no output - incomplete JSON - }, - { - input: llm.CompletionResponse{Content: "2\"}", Done: true}, + input: llm.CompletionResponse{ToolCalls: []api.ToolCall{{Function: api.ToolCallFunction{Name: "calculate", Arguments: api.ToolCallFunctionArguments{"expression": "2+2"}}}}, Done: true}, wantToolCalls: []api.ToolCall{ { Function: api.ToolCallFunction{ @@ -400,9 +381,9 @@ func TestChatHarmonyParserStreamingSimple(t *testing.T) { gin.SetMode(gin.TestMode) mockResponses := []llm.CompletionResponse{ - {Content: "<|message|>First ", Done: false}, + {Content: "First ", Done: false}, {Content: "chunk ", Done: false}, - {Content: "here<|end|>", Done: true, DoneReason: llm.DoneReasonStop}, + {Content: "here", Done: true, DoneReason: llm.DoneReasonStop}, } mock := mockRunner{ @@ -507,189 +488,3 @@ func TestChatHarmonyParserStreamingSimple(t *testing.T) { t.Errorf("expected at least 2 content chunks for streaming, got %d", contentChunks) } } - -func TestChatHarmonyParserStreaming(t *testing.T) { - gin.SetMode(gin.TestMode) - - type expectedChunk struct { - afterResponse int // Which mock response this chunk should appear after - content string // Expected content in this chunk - thinking string // Expected thinking in this chunk - } - - testCases := []struct { - name string - mockResponses []llm.CompletionResponse - expectedChunks []expectedChunk - wantContent string - wantThinking string - }{ - { - name: "simple message without thinking", - mockResponses: []llm.CompletionResponse{ - {Content: "<|start|>assistant<|message|>Hello, ", Done: false}, - {Content: "how can I help?", Done: false}, - {Content: "<|end|>", Done: true, DoneReason: llm.DoneReasonStop}, - }, - expectedChunks: []expectedChunk{ - {afterResponse: 1, content: "Hello, "}, - {afterResponse: 2, content: "how can I help?"}, - }, - wantContent: "Hello, how can I help?", - }, - { - name: "message with analysis channel for thinking", - mockResponses: []llm.CompletionResponse{ - {Content: "<|channel|>analysis<|message|>", Done: false}, - {Content: "Let me think ", Done: false}, - {Content: "about this problem...", Done: false}, - {Content: "<|end|>", Done: false}, - {Content: "<|start|>assistant<|message|>", Done: false}, - {Content: "The answer ", Done: false}, - {Content: "is 42", Done: false}, - {Content: "<|end|>", Done: true, DoneReason: llm.DoneReasonStop}, - }, - expectedChunks: []expectedChunk{ - {afterResponse: 2, thinking: "Let me think "}, - {afterResponse: 3, thinking: "about this problem..."}, - {afterResponse: 6, content: "The answer "}, - {afterResponse: 7, content: "is 42"}, - }, - wantContent: "The answer is 42", - wantThinking: "Let me think about this problem...", - }, - { - name: "streaming with partial tags across boundaries", - mockResponses: []llm.CompletionResponse{ - {Content: "<|chan", Done: false}, - {Content: "nel|>analy", Done: false}, - {Content: "sis<|mess", Done: false}, - {Content: "age|>Think", Done: false}, - {Content: "ing deeply...<|end|>", Done: false}, - {Content: "<|start|>assi", Done: false}, - {Content: "stant<|message|>Result ", Done: false}, - {Content: "computed<|e", Done: false}, - {Content: "nd|>", Done: true, DoneReason: llm.DoneReasonStop}, - }, - expectedChunks: []expectedChunk{ - {afterResponse: 4, thinking: "Think"}, - {afterResponse: 5, thinking: "ing deeply..."}, - {afterResponse: 7, content: "Result "}, - {afterResponse: 8, content: "computed"}, - }, - wantContent: "Result computed", - wantThinking: "Thinking deeply...", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Channel to synchronize mock responses with chunk verification - responsesSent := make(chan int, len(tc.mockResponses)) - - mock := mockRunner{ - CompletionFn: func(ctx context.Context, r llm.CompletionRequest, fn func(llm.CompletionResponse)) error { - // Send mock responses one at a time, notifying when each is sent - for i, resp := range tc.mockResponses { - fn(resp) - responsesSent <- i + 1 - } - close(responsesSent) - return nil - }, - } - - s := Server{ - sched: &Scheduler{ - pendingReqCh: make(chan *LlmRequest, 1), - finishedReqCh: make(chan *LlmRequest, 1), - expiredCh: make(chan *runnerRef, 1), - unloadedCh: make(chan any, 1), - loaded: make(map[string]*runnerRef), - newServerFn: newMockServer(&mock), - getGpuFn: discover.GetGPUInfo, - getCpuFn: discover.GetCPUInfo, - reschedDelay: 250 * time.Millisecond, - loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool { - req.successCh <- &runnerRef{ - llama: &mock, - } - return false - }, - }, - } - - go s.sched.Run(t.Context()) - - // Create a minimal model - _, digest := createHarmonyTestModel(t) - - // Create model with passthrough template - stream := false - w := createRequest(t, s.CreateHandler, api.CreateRequest{ - Model: "harmony-test", - Files: map[string]string{"file.gguf": digest}, - Template: `<|start|><|end|>{{ with .Tools }}{{ end }}{{ .Prompt }}`, - Stream: &stream, - }) - - if w.Code != http.StatusOK { - t.Fatalf("failed to create model: %d", w.Code) - } - - // Test chat endpoint with streaming - streamTrue := true - w = createRequest(t, s.ChatHandler, api.ChatRequest{ - Model: "harmony-test", - Messages: []api.Message{{Role: "user", Content: "Hello"}}, - Stream: &streamTrue, - Tools: getTestTools(), - }) - - if w.Code != http.StatusOK { - t.Fatalf("chat request failed: %d - %s", w.Code, w.Body.String()) - } - - // Parse streaming response - var chunks []api.ChatResponse - var content, thinking strings.Builder - - decoder := json.NewDecoder(w.Body) - for decoder.More() { - var chunk api.ChatResponse - if err := decoder.Decode(&chunk); err != nil { - t.Fatalf("failed to decode chunk: %v", err) - } - chunks = append(chunks, chunk) - - // Accumulate content and thinking from each chunk - content.WriteString(chunk.Message.Content) - thinking.WriteString(chunk.Message.Thinking) - - // Debug output - t.Logf("Chunk %d: content=%q thinking=%q done=%v", len(chunks), chunk.Message.Content, chunk.Message.Thinking, chunk.Done) - } - - // Verify we got streaming chunks - if len(chunks) == 0 { - t.Fatal("expected streaming chunks, got none") - } - - gotContent := content.String() - gotThinking := thinking.String() - - if gotContent != tc.wantContent { - t.Errorf("content mismatch: got %q, want %q", gotContent, tc.wantContent) - } - if gotThinking != tc.wantThinking { - t.Errorf("thinking mismatch: got %q, want %q", gotThinking, tc.wantThinking) - } - - // Verify last chunk has done=true - lastChunk := chunks[len(chunks)-1] - if !lastChunk.Done { - t.Error("expected last chunk to have done=true") - } - }) - } -}