Compare commits

..

8 Commits

Author SHA1 Message Date
ParthSareen
f30d01801d routes: update generate handler to use runner with harmony 2025-08-22 16:06:41 -07:00
ParthSareen
b08c7dad0a harmony: add harmony parsing to runner 2025-08-22 15:47:10 -07:00
ParthSareen
bc5ab5784b routes: ChatHandler to get parsed harmony from runner 2025-08-22 15:46:42 -07:00
ParthSareen
92a99e67c7 harmony: simplify prefill, add marshalling for functions, and update harmony check 2025-08-22 15:45:11 -07:00
ParthSareen
05cebf1f21 server: update completion request signature and update token repeat 2025-08-22 15:40:32 -07:00
ParthSareen
51a400ff0f server: add thinking and tool calls to CompletionResponse 2025-08-21 14:50:34 -07:00
ParthSareen
a865b50d9a harmony: move harmony parsing into a package 2025-08-21 12:42:48 -07:00
Daniel Hiltgen
31f64183dc perf: build graph for next batch in parallel to keep GPU busy
This refactors the main run loop of the ollama runner to perform the main GPU
intensive tasks (Compute+Floats) in a go routine so we can prepare the next
batch in parallel to reduce the amount of time the GPU stalls waiting for the
next batch of work.
2025-08-19 12:33:03 -07:00
32 changed files with 779 additions and 886 deletions

View File

@@ -286,23 +286,16 @@ func mapToTypeScriptType(jsonType string) string {
}
}
type ToolFunctionParameters struct {
Type string `json:"type"`
Defs any `json:"$defs,omitempty"`
Items any `json:"items,omitempty"`
Required []string `json:"required"`
Properties map[string]ToolProperty `json:"properties"`
}
func (t *ToolFunctionParameters) String() string {
bts, _ := json.Marshal(t)
return string(bts)
}
type ToolFunction struct {
Name string `json:"name"`
Description string `json:"description"`
Parameters ToolFunctionParameters `json:"parameters"`
Name string `json:"name"`
Description string `json:"description"`
Parameters struct {
Type string `json:"type"`
Defs any `json:"$defs,omitempty"`
Items any `json:"items,omitempty"`
Required []string `json:"required"`
Properties map[string]ToolProperty `json:"properties"`
} `json:"parameters"`
}
func (t *ToolFunction) String() string {

View File

@@ -436,50 +436,3 @@ func TestThinking_UnmarshalJSON(t *testing.T) {
})
}
}
func TestToolFunctionParameters_String(t *testing.T) {
tests := []struct {
name string
params ToolFunctionParameters
expected string
}{
{
name: "simple object with string property",
params: ToolFunctionParameters{
Type: "object",
Required: []string{"name"},
Properties: map[string]ToolProperty{
"name": {
Type: PropertyType{"string"},
Description: "The name of the person",
},
},
},
expected: `{"type":"object","required":["name"],"properties":{"name":{"type":"string","description":"The name of the person"}}}`,
},
{
name: "marshal failure returns empty string",
params: ToolFunctionParameters{
Type: "object",
Defs: func() any {
// Create a cycle that will cause json.Marshal to fail
type selfRef struct {
Self *selfRef
}
s := &selfRef{}
s.Self = s
return s
}(),
Properties: map[string]ToolProperty{},
},
expected: "",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
result := test.params.String()
assert.Equal(t, test.expected, result)
})
}
}

View File

@@ -15,24 +15,19 @@ import (
type gptossModel struct {
ModelParameters
HiddenLayers uint32 `json:"num_hidden_layers"`
MaxPositionEmbeddings uint32 `json:"max_position_embeddings"`
HiddenSize uint32 `json:"hidden_size"`
IntermediateSize uint32 `json:"intermediate_size"`
AttentionHeads uint32 `json:"num_attention_heads"`
KeyValueHeads uint32 `json:"num_key_value_heads"`
HeadDim uint32 `json:"head_dim"`
Experts uint32 `json:"num_experts"`
LocalExperts uint32 `json:"num_local_experts"`
ExpertsPerToken uint32 `json:"experts_per_token"`
RMSNormEpsilon float32 `json:"rms_norm_eps"`
InitialContextLength uint32 `json:"initial_context_length"`
RopeTheta float32 `json:"rope_theta"`
RopeScalingFactor float32 `json:"rope_scaling_factor"`
RopeScaling struct {
Factor float32 `json:"factor"`
} `json:"rope_scaling"`
SlidingWindow uint32 `json:"sliding_window"`
HiddenLayers uint32 `json:"num_hidden_layers"`
HiddenSize uint32 `json:"hidden_size"`
IntermediateSize uint32 `json:"intermediate_size"`
AttentionHeads uint32 `json:"num_attention_heads"`
KeyValueHeads uint32 `json:"num_key_value_heads"`
HeadDim uint32 `json:"head_dim"`
Experts uint32 `json:"num_experts"`
ExpertsPerToken uint32 `json:"experts_per_token"`
RMSNormEpsilon float32 `json:"rms_norm_eps"`
InitialContextLength uint32 `json:"initial_context_length"`
RopeTheta float32 `json:"rope_theta"`
RopeScalingFactor float32 `json:"rope_scaling_factor"`
SlidingWindow uint32 `json:"sliding_window"`
}
var _ ModelConverter = (*gptossModel)(nil)
@@ -41,11 +36,11 @@ func (m *gptossModel) KV(t *Tokenizer) ggml.KV {
kv := m.ModelParameters.KV(t)
kv["general.architecture"] = "gptoss"
kv["general.file_type"] = uint32(4)
kv["gptoss.context_length"] = cmp.Or(m.MaxPositionEmbeddings, uint32(m.RopeScalingFactor*float32(m.InitialContextLength)))
kv["gptoss.context_length"] = uint32(m.RopeScalingFactor * float32(m.InitialContextLength))
kv["gptoss.block_count"] = m.HiddenLayers
kv["gptoss.embedding_length"] = m.HiddenSize
kv["gptoss.feed_forward_length"] = m.IntermediateSize
kv["gptoss.expert_count"] = cmp.Or(m.Experts, m.LocalExperts)
kv["gptoss.expert_count"] = m.Experts
kv["gptoss.expert_used_count"] = m.ExpertsPerToken
kv["gptoss.attention.head_count"] = m.AttentionHeads
kv["gptoss.attention.head_count_kv"] = m.KeyValueHeads
@@ -54,7 +49,7 @@ func (m *gptossModel) KV(t *Tokenizer) ggml.KV {
kv["gptoss.attention.layer_norm_rms_epsilon"] = cmp.Or(m.RMSNormEpsilon, 1e-5)
kv["gptoss.attention.sliding_window"] = m.SlidingWindow
kv["gptoss.rope.freq_base"] = m.RopeTheta
kv["gptoss.rope.scaling.factor"] = cmp.Or(m.RopeScalingFactor, m.RopeScaling.Factor)
kv["gptoss.rope.scaling.factor"] = m.RopeScalingFactor
kv["gptoss.rope.scaling.original_context_length"] = m.InitialContextLength
kv["tokenizer.ggml.bos_token_id"] = uint32(199998) // <|startoftext|>
kv["tokenizer.ggml.add_bos_token"] = false
@@ -97,11 +92,6 @@ func (m *gptossModel) Tensors(ts []Tensor) []*ggml.Tensor {
for name, mxfp4 := range mxfp4s {
dims := mxfp4.blocks.Shape()
if !strings.HasSuffix(name, ".weight") {
name += ".weight"
}
out = append(out, &ggml.Tensor{
Name: name,
Kind: uint32(ggml.TensorTypeMXFP4),
@@ -114,47 +104,25 @@ func (m *gptossModel) Tensors(ts []Tensor) []*ggml.Tensor {
}
func (m *gptossModel) Replacements() []string {
var replacements []string
if m.MaxPositionEmbeddings > 0 {
// hf flavored model
replacements = []string{
"lm_head", "output",
"model.embed_tokens", "token_embd",
"model.layers", "blk",
"input_layernorm", "attn_norm",
"self_attn.q_proj", "attn_q",
"self_attn.k_proj", "attn_k",
"self_attn.v_proj", "attn_v",
"self_attn.o_proj", "attn_out",
"self_attn.sinks", "attn_sinks",
"post_attention_layernorm", "ffn_norm",
"mlp.router", "ffn_gate_inp",
"mlp.experts.gate_up_proj_", "ffn_gate_up_exps.",
"mlp.experts.down_proj_", "ffn_down_exps.",
"model.norm", "output_norm",
}
} else {
replacements = []string{
// noop replacements so other replacements will not be applied
".blocks", ".blocks",
".scales", ".scales",
// real replacements
"block", "blk",
"attn.norm", "attn_norm",
"attn.qkv", "attn_qkv",
"attn.sinks", "attn_sinks",
"attn.out", "attn_out",
"mlp.norm", "ffn_norm",
"mlp.gate", "ffn_gate_inp",
"mlp.mlp1_", "ffn_gate_up_exps.",
"mlp.mlp2_", "ffn_down_exps.",
"embedding", "token_embd",
"norm", "output_norm",
"unembedding", "output",
"scale", "weight",
}
return []string{
// noop replacements so other replacements will not be applied
".blocks", ".blocks",
".scales", ".scales",
// real replacements
"block", "blk",
"attn.norm", "attn_norm",
"attn.qkv", "attn_qkv",
"attn.sinks", "attn_sinks",
"attn.out", "attn_out",
"mlp.norm", "ffn_norm",
"mlp.gate", "ffn_gate_inp",
"mlp.mlp1_", "ffn_gate_up_exps.",
"mlp.mlp2_", "ffn_down_exps.",
"embedding", "token_embd",
"norm", "output_norm",
"unembedding", "output",
"scale", "weight",
}
return replacements
}
type mxfp4 struct {

View File

@@ -2,17 +2,32 @@ package harmony
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"maps"
"slices"
"strings"
"unicode"
"github.com/ollama/ollama/api"
"github.com/ollama/ollama/logutil"
"github.com/ollama/ollama/template"
)
type harmonyParserState int
func ShouldUseHarmony(modelFamily string, template *template.Template) bool {
if slices.Contains([]string{"gptoss", "gpt-oss"}, modelFamily) {
// heuristic to check whether the template expects to be parsed via harmony:
// search for harmony tags that are nearly always used
if template.Contains("<|start|>") && template.Contains("<|end|>") {
return true
}
}
return false
}
const (
harmonyParserState_LookingForMessageStart harmonyParserState = iota
harmonyParserState_ParsingHeader
@@ -76,17 +91,18 @@ func (s *HarmonyParser) AddImplicitStart() {
s.acc.WriteString("<|start|>assistant")
}
func (s *HarmonyParser) AddImplicitStartOrPrefill(lastMessage *api.Message) {
if lastMessage != nil && lastMessage.Role == "assistant" {
// handle prefilling conditions
if lastMessage.Content != "" {
// AddImplicitStartOrPrefill adds content or thinking to the accumulator else adds start tag
func (s *HarmonyParser) AddImplicitStartOrPrefill(prefillContentOrThinking *bool) {
if prefillContentOrThinking != nil {
if *prefillContentOrThinking {
s.acc.WriteString("<|start|>assistant<|channel|>final<|message|>")
return
} else if lastMessage.Thinking != "" {
} else {
s.acc.WriteString("<|start|>assistant<|channel|>analysis<|message|>")
return
}
}
s.AddImplicitStart()
}
@@ -378,6 +394,38 @@ type FunctionNameMap struct {
harmonyToUser map[string]string
}
func (m FunctionNameMap) MarshalJSON() ([]byte, error) {
// necessary to avoid exposing map internals
type alias struct {
UserToHarmony map[string]string `json:"userToHarmony"`
HarmonyToUser map[string]string `json:"harmonyToUser"`
}
return json.Marshal(alias{
UserToHarmony: m.userToHarmony,
HarmonyToUser: m.harmonyToUser,
})
}
func (m *FunctionNameMap) UnmarshalJSON(b []byte) error {
type alias struct {
UserToHarmony map[string]string `json:"userToHarmony"`
HarmonyToUser map[string]string `json:"harmonyToUser"`
}
var a alias
if err := json.Unmarshal(b, &a); err != nil {
return err
}
if m.userToHarmony == nil {
m.userToHarmony = make(map[string]string)
}
if m.harmonyToUser == nil {
m.harmonyToUser = make(map[string]string)
}
maps.Copy(m.userToHarmony, a.UserToHarmony)
maps.Copy(m.harmonyToUser, a.HarmonyToUser)
return nil
}
func NewFunctionNameMap() *FunctionNameMap {
return &FunctionNameMap{
userToHarmony: make(map[string]string),

View File

@@ -2,10 +2,13 @@
This directory contains integration tests to exercise Ollama end-to-end to verify behavior
By default, these tests are disabled so `go test ./...` will exercise only unit tests. To run integration tests you must pass the integration tag. `go test -tags=integration ./...`
By default, these tests are disabled so `go test ./...` will exercise only unit tests. To run integration tests you must pass the integration tag. `go test -tags=integration ./...` Some tests require additional tags to enable to allow scoped testing to keep the duration reasonable. For example, testing a broad set of models requires `-tags=integration,models` and a longer timeout (~60m or more depending on the speed of your GPU.). To view the current set of tag combinations use `find integration -type f | xargs grep "go:build"`
The integration tests have 2 modes of operating.
1. By default, they will start the server on a random port, run the tests, and then shutdown the server.
2. If `OLLAMA_TEST_EXISTING` is set to a non-empty string, the tests will run against an existing running server, which can be remote
2. If `OLLAMA_TEST_EXISTING` is set to a non-empty string, the tests will run against an existing running server, which can be remote based on your `OLLAMA_HOST` environment variable
> [!IMPORTANT]
> Before running the tests locally without the "test existing" setting, compile ollama from the top of the source tree `go build .` in addition to GPU support with cmake if applicable on your platform. The integration tests expect to find an ollama binary at the top of the tree.

View File

@@ -66,7 +66,7 @@ func TestContextExhaustion(t *testing.T) {
DoGenerate(ctx, t, client, req, []string{"once", "upon", "lived"}, 120*time.Second, 10*time.Second)
}
// Send multiple requests with prior context and ensure the response is coherant and expected
// Send multiple generate requests with prior context and ensure the response is coherant and expected
func TestGenerateWithHistory(t *testing.T) {
modelOverride := ollamaEngineChatModels[0] // Most recent ollama engine model
req, resp := GenerateRequests()
@@ -111,5 +111,56 @@ func TestGenerateWithHistory(t *testing.T) {
}(i)
}
wg.Wait()
}
// Send multiple chat requests with prior context and ensure the response is coherant and expected
func TestChatWithHistory(t *testing.T) {
modelOverride := ollamaEngineChatModels[0] // Most recent ollama engine model
req, resp := ChatRequests()
numParallel := 2
iterLimit := 2
softTimeout, hardTimeout := getTimeouts(t)
ctx, cancel := context.WithTimeout(context.Background(), hardTimeout)
defer cancel()
client, _, cleanup := InitServerConnection(ctx, t)
defer cleanup()
// Get the server running (if applicable) warm the model up with a single initial empty request
slog.Info("loading", "model", modelOverride)
err := client.Generate(ctx,
&api.GenerateRequest{Model: modelOverride, KeepAlive: &api.Duration{Duration: 10 * time.Second}},
func(response api.GenerateResponse) error { return nil },
)
if err != nil {
t.Fatalf("failed to load model %s: %s", modelOverride, err)
}
var wg sync.WaitGroup
wg.Add(numParallel)
for i := range numParallel {
go func(i int) {
defer wg.Done()
k := i % len(req)
req[k].Model = modelOverride
for j := 0; j < iterLimit; j++ {
if time.Now().Sub(started) > softTimeout {
slog.Info("exceeded soft timeout, winding down test")
return
}
slog.Info("Starting", "thread", i, "iter", j)
// On slower GPUs it can take a while to process the concurrent requests
// so we allow a much longer initial timeout
assistant := DoChat(ctx, t, client, req[k], resp[k], 120*time.Second, 20*time.Second)
if assistant == nil {
t.Fatalf("didn't get an assistant response for context")
}
req[k].Messages = append(req[k].Messages,
*assistant,
api.Message{Role: "user", Content: "tell me more!"},
)
}
}(i)
}
wg.Wait()
}

View File

@@ -19,6 +19,8 @@ import (
)
func TestMaxQueue(t *testing.T) {
t.Skip("this test needs to be re-evaluated to use a proper embedding model")
if os.Getenv("OLLAMA_TEST_EXISTING") != "" {
t.Skip("Max Queue test requires spawning a local server so we can adjust the queue size")
return

View File

@@ -567,6 +567,76 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) {
}
}
func ChatRequests() ([]api.ChatRequest, [][]string) {
genReqs, results := GenerateRequests()
reqs := make([]api.ChatRequest, len(genReqs))
for i := range reqs {
reqs[i].Model = genReqs[i].Model
reqs[i].Stream = genReqs[i].Stream
reqs[i].KeepAlive = genReqs[i].KeepAlive
reqs[i].Messages = []api.Message{
{
Role: "user",
Content: genReqs[i].Prompt,
},
}
}
return reqs, results
}
func DoChat(ctx context.Context, t *testing.T, client *api.Client, req api.ChatRequest, anyResp []string, initialTimeout, streamTimeout time.Duration) *api.Message {
stallTimer := time.NewTimer(initialTimeout)
var buf bytes.Buffer
role := "assistant"
fn := func(response api.ChatResponse) error {
// fmt.Print(".")
role = response.Message.Role
buf.Write([]byte(response.Message.Content))
if !stallTimer.Reset(streamTimeout) {
return errors.New("stall was detected while streaming response, aborting")
}
return nil
}
stream := true
req.Stream = &stream
done := make(chan int)
var genErr error
go func() {
genErr = client.Chat(ctx, &req, fn)
done <- 0
}()
select {
case <-stallTimer.C:
if buf.Len() == 0 {
t.Errorf("generate never started. Timed out after :%s", initialTimeout.String())
} else {
t.Errorf("generate stalled. Response so far:%s", buf.String())
}
case <-done:
if genErr != nil && strings.Contains(genErr.Error(), "model requires more system memory") {
slog.Warn("model is too large for the target test system", "model", req.Model, "error", genErr)
return nil
}
require.NoError(t, genErr, "failed with %s request Messages %s ", req.Model, req.Messages)
// Verify the response contains the expected data
response := buf.String()
atLeastOne := false
for _, resp := range anyResp {
if strings.Contains(strings.ToLower(response), resp) {
atLeastOne = true
break
}
}
require.True(t, atLeastOne, "%s: none of %v found in \"%s\" -- request was:%v", req.Model, anyResp, response, req.Messages)
slog.Info("test pass", "model", req.Model, "messages", req.Messages, "contains", anyResp, "response", response)
case <-ctx.Done():
t.Error("outer test context done while waiting for generate")
}
return &api.Message{Role: role, Content: buf.String()}
}
func skipUnderMinVRAM(t *testing.T, gb uint64) {
// TODO use info API in the future
if s := os.Getenv("OLLAMA_MAX_VRAM"); s != "" {

View File

@@ -378,7 +378,9 @@ func (c *Causal) buildMask(ctx ml.Context) ml.Tensor {
maskTensor := ctx.Input().FromFloatSlice(mask, length, batchSize)
if c.config.MaskDType != ml.DTypeF32 {
maskTensor = maskTensor.Cast(ctx, c.config.MaskDType)
out := ctx.Input().Empty(c.config.MaskDType, maskTensor.Shape()...)
ctx.Forward(maskTensor.Copy(ctx, out))
maskTensor = out
}
return maskTensor

View File

@@ -30,7 +30,7 @@ func pickBestFullFitByLibrary(f *ggml.GGML, modelPath string, projectors []strin
// Try to pack into as few GPUs as possible, starting from 1 GPU
for numGPUs := 1; numGPUs <= len(sgl); numGPUs++ {
gpuSubset := sgl[:numGPUs]
ok, estimatedVRAM := predictServerFit(gpuSubset, f, adapters, projectors, opts, numParallel)
ok, estimatedVRAM := PredictServerFit(gpuSubset, f, adapters, projectors, opts, numParallel)
if ok {
slog.Info("new model will fit in available VRAM across minimum required GPUs, loading",
@@ -48,7 +48,7 @@ func pickBestFullFitByLibrary(f *ggml.GGML, modelPath string, projectors []strin
// - try subsets of GPUs instead of just falling back to 1 or all in a family
// Now try all the GPUS (OLLAMA_SCHED_SPREAD is set)
if ok, estimatedVRAM := predictServerFit(sgl, f, adapters, projectors, opts, numParallel); ok {
if ok, estimatedVRAM := PredictServerFit(sgl, f, adapters, projectors, opts, numParallel); ok {
slog.Info("new model will fit in available VRAM, loading",
"model", modelPath,
"library", sgl[0].Library,
@@ -71,7 +71,7 @@ func pickBestPartialFitByLibrary(f *ggml.GGML, projectors []string, adapters []s
var bestEstimate uint64
var bestFit int
for i, gl := range byLibrary {
_, estimatedVRAM := predictServerFit(gl, f, adapters, projectors, opts, numParallel)
_, estimatedVRAM := PredictServerFit(gl, f, adapters, projectors, opts, numParallel)
if estimatedVRAM > bestEstimate {
bestEstimate = estimatedVRAM
bestFit = i
@@ -81,7 +81,7 @@ func pickBestPartialFitByLibrary(f *ggml.GGML, projectors []string, adapters []s
}
// This algorithm looks for a complete fit to determine if we need to unload other models
func predictServerFit(allGpus discover.GpuInfoList, f *ggml.GGML, adapters, projectors []string, opts api.Options, numParallel int) (bool, uint64) {
func PredictServerFit(allGpus discover.GpuInfoList, f *ggml.GGML, adapters, projectors []string, opts api.Options, numParallel int) (bool, uint64) {
// Split up the GPUs by type and try them
var estimatedVRAM uint64
for _, gpus := range allGpus.ByLibrary() {
@@ -97,10 +97,6 @@ func predictServerFit(allGpus discover.GpuInfoList, f *ggml.GGML, adapters, proj
return true, estimatedVRAM
}
}
if len(gpus) == 1 && gpus[0].Library == "cpu" && estimate.TotalSize <= gpus[0].FreeMemory {
return true, estimatedVRAM
}
}
return false, estimatedVRAM
}

View File

@@ -31,6 +31,7 @@ import (
"github.com/ollama/ollama/envconfig"
"github.com/ollama/ollama/format"
"github.com/ollama/ollama/fs/ggml"
"github.com/ollama/ollama/harmony"
"github.com/ollama/ollama/llama"
"github.com/ollama/ollama/logutil"
"github.com/ollama/ollama/ml"
@@ -492,7 +493,6 @@ func (s *llamaServer) Load(ctx context.Context, gpus discover.GpuInfoList, requi
if !requireFull {
g = pickBestPartialFitByLibrary(s.ggml, []string{s.loadRequest.ProjectorPath}, s.loadRequest.LoraPath, s.options, gpus, s.numParallel)
} else {
slog.Info("model requires more memory than is currently available, evicting a model to make space", "estimate", s.estimate)
return ErrLoadRequiredFull
}
}
@@ -525,6 +525,10 @@ func (s *llamaServer) Load(ctx context.Context, gpus discover.GpuInfoList, requi
}
}
if requireFull && len(gpus) == 1 && gpus[0].Library == "cpu" && s.estimate.TotalSize > gpus[0].FreeMemory {
return ErrLoadRequiredFull
}
slog.Info("offload", "", s.estimate)
s.gpus = gpus
@@ -1328,7 +1332,9 @@ type CompletionRequest struct {
Images []ImageData
Options *api.Options
Grammar string // set before sending the request to the subprocess
Grammar string // set before sending the request to the subprocess
FunctionNameMap *harmony.FunctionNameMap
PrefillContent *bool
}
// DoneReason represents the reason why a completion response is done
@@ -1355,13 +1361,15 @@ func (d DoneReason) String() string {
}
type CompletionResponse struct {
Content string `json:"content"`
DoneReason DoneReason `json:"done_reason"`
Done bool `json:"done"`
PromptEvalCount int `json:"prompt_eval_count"`
PromptEvalDuration time.Duration `json:"prompt_eval_duration"`
EvalCount int `json:"eval_count"`
EvalDuration time.Duration `json:"eval_duration"`
Content string `json:"content"`
Thinking string `json:"thinking"`
ToolCalls []api.ToolCall `json:"tool_calls"`
DoneReason DoneReason `json:"done_reason"`
Done bool `json:"done"`
PromptEvalCount int `json:"prompt_eval_count"`
PromptEvalDuration time.Duration `json:"prompt_eval_duration"`
EvalCount int `json:"eval_count"`
EvalDuration time.Duration `json:"eval_duration"`
}
func (s *llmServer) Completion(ctx context.Context, req CompletionRequest, fn func(CompletionResponse)) error {
@@ -1479,7 +1487,7 @@ func (s *llmServer) Completion(ctx context.Context, req CompletionRequest, fn fu
return fmt.Errorf("error unmarshalling llm prediction response: %v", err)
}
switch {
case strings.TrimSpace(c.Content) == lastToken:
case lastToken != "" && (strings.TrimSpace(c.Content) == lastToken || strings.TrimSpace(c.Thinking) == lastToken):
tokenRepeat++
default:
lastToken = strings.TrimSpace(c.Content)
@@ -1492,16 +1500,14 @@ func (s *llmServer) Completion(ctx context.Context, req CompletionRequest, fn fu
return ctx.Err()
}
if c.Content != "" {
fn(CompletionResponse{
Content: c.Content,
})
}
if c.Done {
fn(c)
return nil
}
if c.Content != "" || c.Thinking != "" || len(c.ToolCalls) > 0 {
fn(c)
}
}
}

View File

@@ -396,11 +396,12 @@ type Tensor interface {
Shape() []int
DType() DType
Cast(ctx Context, dtype DType) Tensor
Bytes() []byte
Floats() []float32
BackendSetFromIntSlice(s []int32)
Neg(ctx Context) Tensor
Add(ctx Context, t2 Tensor) Tensor
Sub(ctx Context, t2 Tensor) Tensor

View File

@@ -82,6 +82,7 @@ type Backend struct {
// to the name that is used by the model definition
tensorLoadTargets map[string][]string
schedMu sync.Mutex // Only one Compute can run at a time
sched C.ggml_backend_sched_t
schedBackends []C.ggml_backend_t
schedBufts []C.ggml_backend_buffer_type_t
@@ -769,6 +770,8 @@ func (c *Context) Forward(tensors ...ml.Tensor) ml.Context {
}
func (c *Context) Compute(tensors ...ml.Tensor) {
c.b.schedMu.Lock()
defer c.b.schedMu.Unlock()
if status := C.ggml_backend_sched_graph_compute_async(c.b.sched, c.graph); status != C.GGML_STATUS_SUCCESS {
panic(fmt.Errorf("error computing ggml graph: %v", status))
}
@@ -843,7 +846,23 @@ func (c *Context) newTensor(dtype ml.DType, shape []int) ml.Tensor {
panic("set Input or Layer before creating tensors")
}
cdtype := ggmlDType(dtype)
var cdtype uint32
switch dtype {
case ml.DTypeF32:
cdtype = C.GGML_TYPE_F32
case ml.DTypeF16:
cdtype = C.GGML_TYPE_F16
case ml.DTypeQ80:
cdtype = C.GGML_TYPE_Q8_0
case ml.DTypeQ40:
cdtype = C.GGML_TYPE_Q4_0
case ml.DTypeI32:
cdtype = C.GGML_TYPE_I32
case ml.DTypeMXFP4:
cdtype = C.GGML_TYPE_MXFP4
default:
panic("unsupported dtype")
}
if len(shape) < 1 || shape[0] == 0 {
var shape C.int64_t = 0
@@ -1021,6 +1040,12 @@ func (t *Tensor) Floats() (data []float32) {
return
}
func (t *Tensor) BackendSetFromIntSlice(s []int32) {
if len(s) > 0 {
C.ggml_backend_tensor_set(t.t, unsafe.Pointer(&s[0]), 0, C.ggml_nbytes(t.t))
}
}
func (t *Tensor) DType() ml.DType {
switch t.t._type {
case C.GGML_TYPE_F32:
@@ -1040,32 +1065,6 @@ func (t *Tensor) DType() ml.DType {
}
}
func ggmlDType(dtype ml.DType) uint32 {
switch dtype {
case ml.DTypeF32:
return C.GGML_TYPE_F32
case ml.DTypeF16:
return C.GGML_TYPE_F16
case ml.DTypeQ80:
return C.GGML_TYPE_Q8_0
case ml.DTypeQ40:
return C.GGML_TYPE_Q4_0
case ml.DTypeI32:
return C.GGML_TYPE_I32
case ml.DTypeMXFP4:
return C.GGML_TYPE_MXFP4
default:
panic("unsupported dtype")
}
}
func (t *Tensor) Cast(ctx ml.Context, dtype ml.DType) ml.Tensor {
return &Tensor{
b: t.b,
t: C.ggml_cast(ctx.(*Context).ctx, t.t, ggmlDType(dtype)),
}
}
func (t *Tensor) Neg(ctx ml.Context) ml.Tensor {
return &Tensor{
b: t.b,

View File

@@ -109,7 +109,7 @@ func (bpe BytePairEncoding) Encode(s string, addSpecial bool) ([]int32, error) {
r = 0x0143
case r <= 0x0020:
r = r + 0x0100
case r >= 0x007f && r <= 0x00a0:
case r >= 0x007e && r <= 0x00a0:
r = r + 0x00a2
}

View File

@@ -207,36 +207,6 @@ func TestLlama(t *testing.T) {
}
}
})
t.Run("roundtriping 0x00-0xFF", func(t *testing.T) {
t.Parallel()
for b := 0x00; b <= 0xFF; b++ {
input := string(rune(b))
ids, err := tokenizer.Encode(input, false)
if err != nil {
t.Errorf("failed to encode rune 0x%02X: %v", b, err)
continue
}
decoded, err := tokenizer.Decode(ids)
if err != nil {
t.Errorf("failed to decode rune 0x%02X: %v", b, err)
continue
}
if b == 0x00 {
if len(decoded) != 0 {
t.Errorf("Decode(Encode(0x00)) should be empty, got %v", ids)
}
continue
}
if decoded != input {
t.Errorf("rune 0x%02X failed roundtrip: got %q, want %q", b, decoded, input)
}
}
})
}
func BenchmarkBytePairEncoding(b *testing.B) {

View File

@@ -64,7 +64,7 @@ type MultimodalProcessor interface {
// This function is also responsible for updating MultimodalHash for any Multimodal
// that is modified to ensure that there is a unique hash value that accurately
// represents the contents.
PostTokenize([]input.Input) ([]input.Input, error)
PostTokenize([]*input.Input) ([]*input.Input, error)
}
// Base implements the common fields and methods for all models
@@ -278,13 +278,13 @@ func canNil(t reflect.Type) bool {
t.Kind() == reflect.Slice
}
func Forward(ctx ml.Context, m Model, inputs []int32, batch input.Batch) (ml.Tensor, error) {
func Forward(ctx ml.Context, m Model, inputs []int32, batch input.Batch) (ml.Tensor, ml.Tensor, error) {
if len(batch.Positions) != len(batch.Sequences) {
return nil, fmt.Errorf("length of positions (%v) must match length of seqs (%v)", len(batch.Positions), len(batch.Sequences))
return nil, nil, fmt.Errorf("length of positions (%v) must match length of seqs (%v)", len(batch.Positions), len(batch.Sequences))
}
if len(batch.Positions) < 1 {
return nil, errors.New("batch size cannot be less than 1")
return nil, nil, errors.New("batch size cannot be less than 1")
}
batch.Inputs = ctx.Input().FromIntSlice(inputs, len(inputs))
@@ -293,16 +293,16 @@ func Forward(ctx ml.Context, m Model, inputs []int32, batch input.Batch) (ml.Ten
if cache != nil {
err := cache.StartForward(ctx, batch, false)
if err != nil {
return nil, err
return nil, nil, err
}
}
t, err := m.Forward(ctx, batch)
if err != nil {
return nil, err
return nil, nil, err
}
ctx.Forward(t).Compute(t)
ctx.Forward(t)
return t, nil
return batch.Inputs, t, nil
}

View File

@@ -112,8 +112,8 @@ func (m *Model) EncodeMultimodal(ctx ml.Context, multimodalData []byte) ([]input
return []input.Multimodal{{Tensor: visionOutputs}}, nil
}
func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
var result []input.Input
func (m *Model) PostTokenize(inputs []*input.Input) ([]*input.Input, error) {
var result []*input.Input
for _, inp := range inputs {
if len(inp.Multimodal) == 0 {
@@ -122,17 +122,17 @@ func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
inputMultimodal := inp.Multimodal[0].Tensor
result = append(result,
input.Input{Token: 108, SameBatch: inputMultimodal.Dim(1) + 3}, // "\n\n"
input.Input{Token: 255999}, // "<start_of_image>""
input.Input{Multimodal: []input.Multimodal{{Tensor: inputMultimodal}}, MultimodalHash: inp.MultimodalHash}, // image data is on the first placeholder
&input.Input{Token: 108, SameBatch: inputMultimodal.Dim(1) + 3}, // "\n\n"
&input.Input{Token: 255999}, // "<start_of_image>""
&input.Input{Multimodal: []input.Multimodal{{Tensor: inputMultimodal}}, MultimodalHash: inp.MultimodalHash}, // image data is on the first placeholder
)
// add image token placeholders
result = append(result, slices.Repeat([]input.Input{{Token: 0}}, inputMultimodal.Dim(1)-1)...)
result = append(result, slices.Repeat([]*input.Input{{Token: 0}}, inputMultimodal.Dim(1)-1)...)
result = append(result,
input.Input{Token: 256000}, // <end_of_image>
input.Input{Token: 108}, // "\n\n"
&input.Input{Token: 256000}, // <end_of_image>
&input.Input{Token: 108}, // "\n\n"
)
}
}

View File

@@ -134,16 +134,16 @@ type separator struct {
y bool
}
func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
var result []input.Input
func (m *Model) PostTokenize(inputs []*input.Input) ([]*input.Input, error) {
var result []*input.Input
for _, inp := range inputs {
if len(inp.Multimodal) == 0 {
result = append(result, inp)
continue
}
var imageInputs []input.Input
imageInputs = append(imageInputs, input.Input{Token: 200080}) // <|image_start|>
var imageInputs []*input.Input
imageInputs = append(imageInputs, &input.Input{Token: 200080}) // <|image_start|>
for i, mm := range inp.Multimodal {
patchesPerChunk := mm.Tensor.Dim(1)
@@ -151,20 +151,20 @@ func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
if i < len(inp.Multimodal)-1 {
separator := mm.Data.(*separator)
imageInputs = append(imageInputs, input.Input{Token: 200092, Multimodal: []input.Multimodal{{Tensor: mm.Tensor}}, MultimodalHash: inp.MultimodalHash, SameBatch: patchesPerChunk}) // <|patch|>
imageInputs = append(imageInputs, slices.Repeat([]input.Input{{Token: 200092}}, patchesPerChunk-1)...)
imageInputs = append(imageInputs, &input.Input{Token: 200092, Multimodal: []input.Multimodal{{Tensor: mm.Tensor}}, MultimodalHash: inp.MultimodalHash, SameBatch: patchesPerChunk}) // <|patch|>
imageInputs = append(imageInputs, slices.Repeat([]*input.Input{{Token: 200092}}, patchesPerChunk-1)...)
if separator.x {
imageInputs = append(imageInputs, input.Input{Token: 200084}) // <|tile_x_separator|>
imageInputs = append(imageInputs, &input.Input{Token: 200084}) // <|tile_x_separator|>
}
if separator.y {
imageInputs = append(imageInputs, input.Input{Token: 200085}) // <|tile_y_separator|>
imageInputs = append(imageInputs, &input.Input{Token: 200085}) // <|tile_y_separator|>
}
} else {
imageInputs = append(imageInputs, input.Input{Token: 200090}) // <|image|>
imageInputs = append(imageInputs, input.Input{Token: 200092, Multimodal: []input.Multimodal{{Tensor: mm.Tensor}}, MultimodalHash: inp.MultimodalHash, SameBatch: patchesPerChunk}) // <|patch|>
imageInputs = append(imageInputs, slices.Repeat([]input.Input{{Token: 200092}}, patchesPerChunk-1)...)
imageInputs = append(imageInputs, input.Input{Token: 200080}) // <|image_end|>
imageInputs = append(imageInputs, &input.Input{Token: 200090}) // <|image|>
imageInputs = append(imageInputs, &input.Input{Token: 200092, Multimodal: []input.Multimodal{{Tensor: mm.Tensor}}, MultimodalHash: inp.MultimodalHash, SameBatch: patchesPerChunk}) // <|patch|>
imageInputs = append(imageInputs, slices.Repeat([]*input.Input{{Token: 200092}}, patchesPerChunk-1)...)
imageInputs = append(imageInputs, &input.Input{Token: 200080}) // <|image_end|>
}
}

View File

@@ -133,22 +133,22 @@ func (m *Model) EncodeMultimodal(ctx ml.Context, multimodalData []byte) ([]input
// [IMG]...[IMG][IMG_BREAK][IMG]...[IMG][IMG_BREAK][IMG]...[IMG][IMG_END]
// Each sequence of [IMG]...[IMG] is a set of patches of vision embeddings
// that can be processed together.
func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
var result []input.Input
func (m *Model) PostTokenize(inputs []*input.Input) ([]*input.Input, error) {
var result []*input.Input
for _, inp := range inputs {
if len(inp.Multimodal) == 0 {
result = append(result, inp)
} else {
for i, row := range inp.Multimodal {
// [IMG]
result = append(result, input.Input{Token: 10, Multimodal: []input.Multimodal{{Tensor: row.Tensor}}, MultimodalHash: inp.MultimodalHash, SameBatch: row.Tensor.Dim(1)})
result = append(result, slices.Repeat([]input.Input{{Token: 10}}, row.Tensor.Dim(1)-1)...)
result = append(result, &input.Input{Token: 10, Multimodal: []input.Multimodal{{Tensor: row.Tensor}}, MultimodalHash: inp.MultimodalHash, SameBatch: row.Tensor.Dim(1)})
result = append(result, slices.Repeat([]*input.Input{{Token: 10}}, row.Tensor.Dim(1)-1)...)
if i == len(inp.Multimodal)-1 {
// [IMG_END]
result = append(result, input.Input{Token: 13})
result = append(result, &input.Input{Token: 13})
} else {
// [IMG_BREAK]
result = append(result, input.Input{Token: 12})
result = append(result, &input.Input{Token: 12})
}
}
}

View File

@@ -90,7 +90,7 @@ func (m *Model) EncodeMultimodal(ctx ml.Context, multimodalData []byte) ([]input
return []input.Multimodal{{Tensor: projectedOutputs}}, nil
}
func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
func (m *Model) PostTokenize(inputs []*input.Input) ([]*input.Input, error) {
for i := range inputs {
if inputs[i].Multimodal != nil {
inputs[i].Token = 128256 // <|image|>

View File

@@ -89,8 +89,8 @@ func (m *Model) EncodeMultimodal(ctx ml.Context, multimodalData []byte) ([]input
}
// PostTokenize arranges Qwen-2.5-VL's inputs for the forward pass
func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
var result []input.Input
func (m *Model) PostTokenize(inputs []*input.Input) ([]*input.Input, error) {
var result []*input.Input
var (
imageToken int32 = 151655
@@ -112,16 +112,16 @@ func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
return nil, fmt.Errorf("failed to encode image prompt: %w", err)
}
for i := range pre {
result = append(result, input.Input{Token: pre[i]})
result = append(result, &input.Input{Token: pre[i]})
}
patchesPerChunk := inp.Multimodal[0].Tensor.Dim(1)
// First add the vision start token
result = append(result, input.Input{Token: visionStartToken})
result = append(result, &input.Input{Token: visionStartToken})
// Add the image token with the multimodal tensor data at the first position
result = append(result, input.Input{
result = append(result, &input.Input{
Token: imageToken,
Multimodal: inp.Multimodal,
MultimodalHash: inp.MultimodalHash,
@@ -129,9 +129,9 @@ func (m *Model) PostTokenize(inputs []input.Input) ([]input.Input, error) {
})
// Add the placeholder tokens for the remaining positions (tokensPerGrid-1)
result = append(result, slices.Repeat([]input.Input{{Token: imageToken}}, patchesPerChunk-1)...)
result = append(result, slices.Repeat([]*input.Input{{Token: imageToken}}, patchesPerChunk-1)...)
result = append(result, input.Input{Token: visionEndToken})
result = append(result, &input.Input{Token: visionEndToken})
}
}

View File

@@ -557,10 +557,12 @@ func fromChatRequest(r ChatCompletionRequest) (*api.ChatRequest, error) {
var think *api.ThinkValue
if r.Reasoning != nil {
options["reasoning"] = *r.Reasoning.Effort
think = &api.ThinkValue{
Value: *r.Reasoning.Effort,
}
} else if r.ReasoningEffort != nil {
options["reasoning"] = *r.ReasoningEffort
think = &api.ThinkValue{
Value: *r.ReasoningEffort,
}

View File

@@ -46,7 +46,7 @@ func NewInputCache(lc *llama.Context, kvSize int, numSlots int, multiUserCache b
}
// Locking: Operations on InputCacheSlot (including finding one
// through LoadCacheSlot) require a lock to be held that serializes
// through LoadCacheSlot) require a lock to be be held that serializes
// these operations with each other and llama.Decode
type InputCacheSlot struct {

View File

@@ -78,7 +78,7 @@ func (c *InputCache) Close() {
}
// Locking: Operations on InputCacheSlot (including finding one
// through LoadCacheSlot) require a lock to be held that serializes
// through LoadCacheSlot) require a lock to be be held that serializes
// these operations with each other and processBatch
type InputCacheSlot struct {
@@ -86,7 +86,7 @@ type InputCacheSlot struct {
Id int
// Inputs that are stored in the KV cache
Inputs []input.Input
Inputs []*input.Input
// is this cache actively being processed as part of a sequence?
InUse bool
@@ -95,7 +95,7 @@ type InputCacheSlot struct {
lastUsed time.Time
}
func (c *InputCache) LoadCacheSlot(prompt []input.Input) (*InputCacheSlot, []input.Input, error) {
func (c *InputCache) LoadCacheSlot(prompt []*input.Input) (*InputCacheSlot, []*input.Input, error) {
var slot *InputCacheSlot
var numPast int32
var err error
@@ -146,7 +146,7 @@ func (c *InputCache) LoadCacheSlot(prompt []input.Input) (*InputCacheSlot, []inp
return slot, prompt, nil
}
func (c *InputCache) findLongestCacheSlot(prompt []input.Input) (*InputCacheSlot, int32, error) {
func (c *InputCache) findLongestCacheSlot(prompt []*input.Input) (*InputCacheSlot, int32, error) {
longest := int32(-1)
var longestSlot *InputCacheSlot
@@ -169,7 +169,7 @@ func (c *InputCache) findLongestCacheSlot(prompt []input.Input) (*InputCacheSlot
return longestSlot, longest, nil
}
func (c *InputCache) findBestCacheSlot(prompt []input.Input) (*InputCacheSlot, int32, error) {
func (c *InputCache) findBestCacheSlot(prompt []*input.Input) (*InputCacheSlot, int32, error) {
oldest := time.Now()
var oldestSlot *InputCacheSlot
@@ -205,7 +205,7 @@ func (c *InputCache) findBestCacheSlot(prompt []input.Input) (*InputCacheSlot, i
if longest > 0 && longestSlot != oldestSlot {
slog.Debug("forking cache slot", "src", longestSlot.Id, "dst", oldestSlot.Id, "inputs", longest, "total",
len(longestSlot.Inputs))
oldestSlot.Inputs = make([]input.Input, longest)
oldestSlot.Inputs = make([]*input.Input, longest)
copy(oldestSlot.Inputs, longestSlot.Inputs[:longest])
if c.cache != nil {
c.cache.CopyPrefix(longestSlot.Id, oldestSlot.Id, longest)
@@ -215,7 +215,7 @@ func (c *InputCache) findBestCacheSlot(prompt []input.Input) (*InputCacheSlot, i
return oldestSlot, longest, nil
}
func countCommonPrefix(a []input.Input, b []input.Input) int32 {
func countCommonPrefix(a []*input.Input, b []*input.Input) int32 {
var count int32
for i := range a {
@@ -250,7 +250,7 @@ func (c *InputCache) ShiftDiscard(inputLen int32, numKeep int32) int32 {
}
type ErrReprocessInputs struct {
Inputs []input.Input
Inputs []*input.Input
}
func (e *ErrReprocessInputs) Error() string {
@@ -283,13 +283,13 @@ func (c *InputCache) ShiftCacheSlot(slot *InputCacheSlot, numKeep int32) error {
"id", slot.Id, "error", err)
// Create new input slice with preserved tokens (numKeep + remaining tokens after discard)
newInputs := make([]input.Input, numKeep+inputLen-(numKeep+discard))
newInputs := make([]*input.Input, numKeep+inputLen-(numKeep+discard))
copy(newInputs[:numKeep], slot.Inputs[:numKeep])
copy(newInputs[numKeep:], slot.Inputs[numKeep+discard:])
// Reset the cache
_ = c.cache.Remove(slot.Id, 0, math.MaxInt32)
slot.Inputs = []input.Input{}
slot.Inputs = []*input.Input{}
// Return error with inputs that need to be reprocessed
return &ErrReprocessInputs{Inputs: newInputs}

View File

@@ -13,50 +13,50 @@ import (
func TestCountCommon(t *testing.T) {
tests := []struct {
name string
t1 []input.Input
t2 []input.Input
t1 []*input.Input
t2 []*input.Input
expected int32
}{
{
name: "Equal",
t1: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
t2: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
t1: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
t2: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
expected: 3,
},
{
name: "Prefix",
t1: []input.Input{{Token: 1}},
t2: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
t1: []*input.Input{{Token: 1}},
t2: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
expected: 1,
},
{
name: "Image Prefix",
t1: []input.Input{{MultimodalHash: 1}},
t2: []input.Input{{MultimodalHash: 1}, {MultimodalHash: 2}, {MultimodalHash: 3}},
t1: []*input.Input{{MultimodalHash: 1}},
t2: []*input.Input{{MultimodalHash: 1}, {MultimodalHash: 2}, {MultimodalHash: 3}},
expected: 1,
},
{
name: "Mixed",
t1: []input.Input{{Token: 1}, {MultimodalHash: 1}},
t2: []input.Input{{Token: 1}, {MultimodalHash: 1}, {Token: 5}},
t1: []*input.Input{{Token: 1}, {MultimodalHash: 1}},
t2: []*input.Input{{Token: 1}, {MultimodalHash: 1}, {Token: 5}},
expected: 2,
},
{
name: "Mixed, Same Length",
t1: []input.Input{{Token: 1}, {MultimodalHash: 1}},
t2: []input.Input{{Token: 1}, {MultimodalHash: 2}},
t1: []*input.Input{{Token: 1}, {MultimodalHash: 1}},
t2: []*input.Input{{Token: 1}, {MultimodalHash: 2}},
expected: 1,
},
{
name: "Empty",
t1: []input.Input{},
t2: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
t1: []*input.Input{},
t2: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
expected: 0,
},
{
name: "Both Empty",
t1: []input.Input{},
t2: []input.Input{},
t1: []*input.Input{},
t2: []*input.Input{},
expected: 0,
},
}
@@ -80,7 +80,7 @@ func TestFindCacheSlot(t *testing.T) {
tests := []struct {
name string
cache InputCache
prompt []input.Input
prompt []*input.Input
longest expected
best expected
}{
@@ -89,18 +89,18 @@ func TestFindCacheSlot(t *testing.T) {
cache: InputCache{slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{},
Inputs: []*input.Input{},
InUse: false,
lastUsed: time.Time{},
},
{
Id: 1,
Inputs: []input.Input{},
Inputs: []*input.Input{},
InUse: false,
lastUsed: time.Time{},
},
}},
prompt: []input.Input{{Token: 1}},
prompt: []*input.Input{{Token: 1}},
longest: expected{result: 0, len: 0},
best: expected{result: 0, len: 0},
},
@@ -109,18 +109,18 @@ func TestFindCacheSlot(t *testing.T) {
cache: InputCache{slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}},
Inputs: []*input.Input{{Token: 1}},
InUse: false,
lastUsed: time.Now().Add(-time.Second),
},
{
Id: 1,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: false,
lastUsed: time.Now().Add(-2 * time.Second),
},
}},
prompt: []input.Input{{Token: 1}, {Token: 2}},
prompt: []*input.Input{{Token: 1}, {Token: 2}},
longest: expected{result: 1, len: 2},
best: expected{result: 1, len: 2},
},
@@ -129,18 +129,18 @@ func TestFindCacheSlot(t *testing.T) {
cache: InputCache{slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: false,
lastUsed: time.Now().Add(-time.Second),
},
{
Id: 1,
Inputs: []input.Input{},
Inputs: []*input.Input{},
InUse: false,
lastUsed: time.Time{},
},
}},
prompt: []input.Input{{Token: 2}},
prompt: []*input.Input{{Token: 2}},
longest: expected{result: 0, len: 0},
best: expected{result: 1, len: 0},
},
@@ -150,19 +150,19 @@ func TestFindCacheSlot(t *testing.T) {
slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: false,
lastUsed: time.Now().Add(-time.Second),
},
{
Id: 1,
Inputs: []input.Input{},
Inputs: []*input.Input{},
InUse: false,
lastUsed: time.Time{},
},
},
},
prompt: []input.Input{{Token: 1}},
prompt: []*input.Input{{Token: 1}},
longest: expected{result: 0, len: 1},
best: expected{result: 1, len: 1},
},
@@ -171,18 +171,18 @@ func TestFindCacheSlot(t *testing.T) {
cache: InputCache{slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}},
Inputs: []*input.Input{{Token: 1}},
InUse: false,
lastUsed: time.Now().Add(-time.Second),
},
{
Id: 1,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: false,
lastUsed: time.Now().Add(-2 * time.Second),
},
}},
prompt: []input.Input{{Token: 2}, {Token: 3}},
prompt: []*input.Input{{Token: 2}, {Token: 3}},
longest: expected{result: 0, len: 0},
best: expected{result: 1, len: 0},
},
@@ -191,18 +191,18 @@ func TestFindCacheSlot(t *testing.T) {
cache: InputCache{slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: true,
lastUsed: time.Now().Add(-time.Second),
},
{
Id: 1,
Inputs: []input.Input{{Token: 1}},
Inputs: []*input.Input{{Token: 1}},
InUse: false,
lastUsed: time.Now().Add(-2 * time.Second),
},
}},
prompt: []input.Input{{Token: 1}, {Token: 2}},
prompt: []*input.Input{{Token: 1}, {Token: 2}},
longest: expected{result: 1, len: 1},
best: expected{result: 1, len: 2},
},
@@ -300,7 +300,7 @@ func TestLoadCacheSlot(t *testing.T) {
tests := []struct {
name string
cache InputCache
prompt []input.Input
prompt []*input.Input
wantErr bool
expectedSlotId int
expectedPrompt int // expected length of remaining prompt
@@ -312,19 +312,19 @@ func TestLoadCacheSlot(t *testing.T) {
slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: false,
lastUsed: time.Now().Add(-time.Second),
},
{
Id: 1,
Inputs: []input.Input{},
Inputs: []*input.Input{},
InUse: false,
lastUsed: time.Now().Add(-2 * time.Second),
},
},
},
prompt: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
prompt: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
wantErr: false,
expectedSlotId: 0,
expectedPrompt: 1, // Only token 3 remains
@@ -336,19 +336,19 @@ func TestLoadCacheSlot(t *testing.T) {
slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: false,
lastUsed: time.Now().Add(-time.Second),
},
{
Id: 1,
Inputs: []input.Input{},
Inputs: []*input.Input{},
InUse: false,
lastUsed: time.Now().Add(-2 * time.Second),
},
},
},
prompt: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
prompt: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
wantErr: false,
expectedSlotId: 0,
expectedPrompt: 1, // Only token 3 remains
@@ -360,13 +360,13 @@ func TestLoadCacheSlot(t *testing.T) {
slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: false,
lastUsed: time.Now().Add(-time.Second),
},
},
},
prompt: []input.Input{{Token: 1}, {Token: 2}},
prompt: []*input.Input{{Token: 1}, {Token: 2}},
wantErr: false,
expectedSlotId: 0,
expectedPrompt: 1, // Should leave 1 token for sampling
@@ -378,13 +378,13 @@ func TestLoadCacheSlot(t *testing.T) {
slots: []InputCacheSlot{
{
Id: 0,
Inputs: []input.Input{{Token: 1}, {Token: 2}},
Inputs: []*input.Input{{Token: 1}, {Token: 2}},
InUse: true,
lastUsed: time.Now().Add(-time.Second),
},
},
},
prompt: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
prompt: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}},
wantErr: true,
expectedSlotId: -1,
expectedPrompt: -1,
@@ -452,7 +452,7 @@ func TestShiftCacheSlot(t *testing.T) {
tests := []struct {
name string
numCtx int32
inputs []input.Input
inputs []*input.Input
numKeep int32
cacheErr bool
wantErr any
@@ -461,7 +461,7 @@ func TestShiftCacheSlot(t *testing.T) {
{
name: "Normal shift",
numCtx: 10,
inputs: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}, {Token: 4}, {Token: 5}, {Token: 6}, {Token: 7}, {Token: 8}, {Token: 9}, {Token: 10}},
inputs: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}, {Token: 4}, {Token: 5}, {Token: 6}, {Token: 7}, {Token: 8}, {Token: 9}, {Token: 10}},
numKeep: 2,
cacheErr: false, // No error
wantErr: nil,
@@ -470,7 +470,7 @@ func TestShiftCacheSlot(t *testing.T) {
{
name: "Cache removal fails",
numCtx: 10,
inputs: []input.Input{{Token: 1}, {Token: 2}, {Token: 3}, {Token: 4}, {Token: 5}, {Token: 6}, {Token: 7}, {Token: 8}, {Token: 9}, {Token: 10}},
inputs: []*input.Input{{Token: 1}, {Token: 2}, {Token: 3}, {Token: 4}, {Token: 5}, {Token: 6}, {Token: 7}, {Token: 8}, {Token: 9}, {Token: 10}},
numKeep: 2,
cacheErr: true,
wantErr: &ErrReprocessInputs{},
@@ -487,7 +487,7 @@ func TestShiftCacheSlot(t *testing.T) {
}
slot := &InputCacheSlot{
Id: 123,
Inputs: make([]input.Input, len(tt.inputs)),
Inputs: make([]*input.Input, len(tt.inputs)),
}
copy(slot.Inputs, tt.inputs)

View File

@@ -17,6 +17,7 @@ import (
"reflect"
"regexp"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync"
@@ -28,6 +29,7 @@ import (
"github.com/ollama/ollama/api"
"github.com/ollama/ollama/envconfig"
"github.com/ollama/ollama/harmony"
"github.com/ollama/ollama/llm"
"github.com/ollama/ollama/logutil"
"github.com/ollama/ollama/ml"
@@ -51,10 +53,10 @@ type Sequence struct {
iBatch int
// prompt inputs left to evaluate
inputs []input.Input
inputs []*input.Input
// inputs that have been added to a batch but not yet submitted to Forward
pendingInputs []input.Input
pendingInputs []*input.Input
// tokens that have been generated but not returned yet (e.g. for stop sequences)
pendingResponses []string
@@ -86,6 +88,12 @@ type Sequence struct {
// true if an embedding are to be returned instead of text generation
embeddingOnly bool
// true if the sequence if finished and marked for removal on next pass
finished bool
// True if we have to skip this sequence to shift the cache
skipForShift bool
doneReason llm.DoneReason
// Metrics
@@ -182,8 +190,8 @@ func (s *Server) NewSequence(prompt string, images []llm.ImageData, params NewSe
// inputs processes the prompt and images into a list of inputs
// by splitting the prompt on [img-<n>] tags, tokenizing text and
// decoding images
func (s *Server) inputs(prompt string, images []llm.ImageData) ([]input.Input, []ml.Context, multimodalStore, error) {
var inputs []input.Input
func (s *Server) inputs(prompt string, images []llm.ImageData) ([]*input.Input, []ml.Context, multimodalStore, error) {
var inputs []*input.Input
var ctxs []ml.Context
var mmStore multimodalStore
@@ -210,7 +218,7 @@ func (s *Server) inputs(prompt string, images []llm.ImageData) ([]input.Input, [
}
for _, t := range tokens {
inputs = append(inputs, input.Input{Token: t})
inputs = append(inputs, &input.Input{Token: t})
}
// image - decode and store
@@ -243,7 +251,7 @@ func (s *Server) inputs(prompt string, images []llm.ImageData) ([]input.Input, [
mmStore.addMultimodal(imageEmbeddings)
inputs = append(inputs, input.Input{Multimodal: imageEmbeddings, MultimodalHash: imageHash})
inputs = append(inputs, &input.Input{Multimodal: imageEmbeddings, MultimodalHash: imageHash})
postTokenize = true
}
}
@@ -259,6 +267,27 @@ func (s *Server) inputs(prompt string, images []llm.ImageData) ([]input.Input, [
return inputs, ctxs, mmStore, nil
}
type batchState struct {
id int
ctx ml.Context
modelInput ml.Tensor
modelOutput ml.Tensor
batchInputs []*input.Input
batch input.Batch
seqs []*Sequence // full set of seqs at the time this batch was initiated
initSeqIdx int // The initial value for the set of sequences evaluated (s.nextSeq - 1)
// Signaled when this batches inputs are ready and compute can proceed
inputsReadyCh chan struct{}
// Signaling when Compute is about to begin on this batch, and
// seqs have been updated to prepare for the next batch
computeStartedCh chan struct{}
// Signaled when this batches outputs are complete and the next batch can proceed
outputsReadyCh chan struct{}
}
type Server struct {
// modelPath is the location of the model to be loaded
modelPath string
@@ -290,6 +319,16 @@ type Server struct {
// TODO (jmorganca): make this n_batch
batchSize int
// Used to signal a hard failure during async processing which will panic the runner
hardErrCh chan error
// A prior batch that's still being processed
// only read or written by forwardBatch
pendingBatch *batchState
// Simple counter used only for trace logging batches
batchID int
// protects access to everything below this line
// this is context state needed for decoding
mu sync.Mutex
@@ -350,45 +389,132 @@ func flushPending(seq *Sequence) bool {
}
}
func (s *Server) removeSequence(seqIndex int, reason llm.DoneReason) {
func (s *Server) finishSequence(seqIndex int, reason llm.DoneReason) {
seq := s.seqs[seqIndex]
// finish could be called multiple times since we prepare 1 batch ahead
// and multiple scenarios can lead to finishing a sequence
// ensure only the first finish called is processed
if seq.finished {
return
}
flushPending(seq)
seq.doneReason = reason
seq.finished = true
close(seq.responses)
close(seq.embedding)
seq.cache.InUse = false
}
func (s *Server) removeFinishedSequence(seqIndex int) {
s.seqs[seqIndex] = nil
s.seqsSem.Release(1)
}
// track batch state between forwardBatch, computeBatch and predictForwardBatch
func (s *Server) run(ctx context.Context) {
s.ready.Wait()
var bs *batchState
for {
select {
case <-ctx.Done():
return
case err := <-s.hardErrCh:
panic(err)
default:
err := s.processBatch()
var err error
bs, err = s.forwardBatch()
if err != nil {
panic(err)
}
if bs == nil {
continue
}
go s.computeBatch(bs)
}
}
}
func (s *Server) processBatch() error {
// forwardBatch will calculate a batch.
func (s *Server) forwardBatch() (*batchState, error) {
inputsReady := false
var inputsReadyCh chan struct{}
// If we have a pending batch still processing, wait until Compute has started
// before setting up the next batch so the seqs inputs are ready to receive their
// token values and we get the correct input pointers for the batchInputs
if s.pendingBatch != nil {
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch waiting for compute to start", "pendingBatch.id", s.pendingBatch.id)
<-s.pendingBatch.computeStartedCh
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch compute started, setting up next batch", "pendingBatch.id", s.pendingBatch.id, "id", s.batchID)
inputsReadyCh = s.pendingBatch.outputsReadyCh // Chain the ouputs from the pending batch to the next inputs batch
} else {
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch no pending batch detected", "batchID", s.batchID)
inputsReady = true // No pendingBatch, so the inputs will be ready in the seqs immediately
inputsReadyCh = make(chan struct{}, 1)
}
s.mu.Lock()
for s.allNil() {
s.cond.Wait() // Wait until an item is added
}
defer s.mu.Unlock()
ctx := s.model.Backend().NewContext()
defer ctx.Close()
// If new sequences have been added with an active batch we delay preparing the next batch
// until Compute has finished
if s.pendingBatch != nil {
for seqIdx := range s.seqs {
if s.seqs[seqIdx] != s.pendingBatch.seqs[seqIdx] {
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch seqs changed, waiting for compute to finish to pick up new sequence(s)", "pendingBatch.id", s.pendingBatch.id)
s.mu.Unlock() // release the lock so computeBatch can finish up
<-s.pendingBatch.outputsReadyCh
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch pending batch outputs ready", "pendingBatch.id", s.pendingBatch.id)
s.mu.Lock()
inputsReady = true // pendingBatch completed, so the inputs are ready in the seqs
break
}
}
}
// Clear pending Batch - we'll set it if we have a batch with any inputs
s.pendingBatch = nil
var batchInputs []int32
// Remove any finished sequences before recording the active set of seqs in the batch
for seqIdx := range s.seqs {
seq := s.seqs[seqIdx]
if seq == nil {
continue
}
if seq.finished {
s.removeFinishedSequence(seqIdx)
continue
}
if seq.numPredict > 0 && seq.numPredicted >= seq.numPredict {
s.finishSequence(seqIdx, llm.DoneReasonLength)
s.removeFinishedSequence(seqIdx)
continue
}
}
// next batch
nb := &batchState{
id: s.batchID,
initSeqIdx: s.nextSeq - 1,
seqs: make([]*Sequence, len(s.seqs)),
inputsReadyCh: inputsReadyCh,
computeStartedCh: make(chan struct{}, 1),
outputsReadyCh: make(chan struct{}, 1),
}
ctx := s.model.Backend().NewContext()
nb.ctx = ctx
// Record the sequences at the time we create the batch so we can detect if new sequences are added on the next pass
copy(nb.seqs, s.seqs)
// Prepare the seqs and batch, but defer the input token values as we may not be ready yet
var batchInputs []*input.Input
var batch input.Batch
resumeSeq := -1
@@ -396,20 +522,13 @@ func (s *Server) processBatch() error {
for range s.seqs {
seqIdx = (seqIdx + 1) % len(s.seqs)
seq := s.seqs[seqIdx]
if seq == nil {
continue
}
// if past the num predict limit
if seq.numPredict > 0 && seq.numPredicted >= seq.numPredict {
s.removeSequence(seqIdx, llm.DoneReasonLength)
continue
}
if !s.cache.enabled {
seq.inputs = append(seq.cache.Inputs, seq.inputs...)
seq.cache.Inputs = []input.Input{}
seq.cache.Inputs = []*input.Input{}
}
batchSize := s.batchSize
@@ -449,18 +568,21 @@ func (s *Server) processBatch() error {
// Prepend these inputs to the sequence's inputs queue for reprocessing
seq.inputs = append(reprocess.Inputs, seq.inputs...)
// Skip this sequence but continue processing the rest
seq.skipForShift = true // cleared in computeBatch below for the next batch
continue
} else {
return err
ctx.Close()
return nil, err
}
}
}
batchInputs = append(batchInputs, inp.Token)
batchInputs = append(batchInputs, seq.inputs[i])
if inp.Multimodal != nil {
mm, err := seq.mmStore.getMultimodal(s.model.Backend(), ctx, inp.Multimodal, false)
if err != nil {
return err
ctx.Close()
return nil, err
}
batch.Multimodal = append(batch.Multimodal, input.MultimodalIndex{Index: len(batchInputs) - 1, Multimodal: mm})
}
@@ -468,10 +590,13 @@ func (s *Server) processBatch() error {
batch.Positions = append(batch.Positions, int32(len(seq.cache.Inputs)+len(seq.pendingInputs)))
batch.Sequences = append(batch.Sequences, seq.cache.Id)
// TODO BUG HERE!!!
// Somehow sometimes iBatch isn't set correctly
seq.iBatch = len(batch.Outputs)
if i+1 == len(seq.inputs) {
batch.Outputs = append(batch.Outputs, int32(len(batchInputs)-1))
}
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch iBatch", "batchID", s.batchID, "seqIdx", seqIdx, "seq.iBatch", seq.iBatch, "i+1", i+1, "len(seq.inputs)", len(seq.inputs))
seq.pendingInputs = append(seq.pendingInputs, inp)
}
@@ -485,36 +610,138 @@ func (s *Server) processBatch() error {
}
if len(batchInputs) == 0 {
return nil
slog.Log(context.TODO(), logutil.LevelTrace, "forwardBatch no batchInputs, going idle", "batchID", s.batchID)
ctx.Close()
return nil, nil
}
s.batchID++
modelOutput, err := model.Forward(ctx, s.model, batchInputs, batch)
var err error
// Actual batchInputs values will be injected into the modelInput tensor before calling Compute
nb.modelInput, nb.modelOutput, err = model.Forward(ctx, s.model, make([]int32, len(batchInputs)), batch)
if err != nil {
return fmt.Errorf("failed to decode batch: %w", err)
ctx.Close()
return nil, fmt.Errorf("failed to build graph: %w", err)
}
nb.batchInputs = batchInputs
nb.batch = batch
// computeBatch will close the context in the batch upon completion
s.pendingBatch = nb
if inputsReady {
nb.inputsReadyCh <- struct{}{}
}
logits := modelOutput.Floats()
return nb, nil
}
// Async processing of the next batch
func (s *Server) computeBatch(bs *batchState) {
if bs == nil || bs.ctx == nil {
// Nothing to compute
return
}
defer bs.ctx.Close()
// Wait until inputs are ready
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: waiting for inputs to be ready", "batchID", bs.id)
<-bs.inputsReadyCh
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: inputs are ready", "batchID", bs.id)
// Once we complete, signal the next batch of inputs are ready
// This will unblock the next computeBatch, or forwardBatch if new seqs come in
defer func() {
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: outputs are ready", "batchID", bs.id)
bs.outputsReadyCh <- struct{}{}
}()
s.mu.Lock()
// Gather the actual input token values now that they're ready
batchInputs := make([]int32, len(bs.batchInputs))
for i := range batchInputs {
batchInputs[i] = bs.batchInputs[i].Token
}
// TODO the following logic could be run in a go routine to possibly speed up getting to Compute
// Now we run part of the decoding algorithm to adjust the seq.inputs with placeholder tokens
// so that forwardBatch can build a batchInputs set which will eventually contain the actual
// decoded tokens.
promptProcessing := make([]bool, len(s.seqs)) // track seq's we skip
nextBatchTokens := make([]*input.Input, len(s.seqs))
iBatches := make([]int, len(s.seqs)) // Record the iBatch values before releasing the lock
for i, seq := range s.seqs {
iBatches[i] = -1
if seq == nil {
continue
}
// Skip over any newly added sequences
if bs.seqs[i] == nil {
continue
}
// After calling Forward, pending inputs are now in the cache
if len(seq.pendingInputs) > 0 {
seq.cache.Inputs = append(seq.cache.Inputs, seq.pendingInputs...)
seq.pendingInputs = []input.Input{}
seq.pendingInputs = []*input.Input{}
}
// don't sample prompt processing
if len(seq.inputs) != 0 {
if !s.cache.enabled {
return errors.New("caching disabled but unable to fit entire input in a batch")
s.hardErrCh <- fmt.Errorf("caching disabled but unable to fit entire input in a batch")
return
}
// Record so we can skip during Decode
promptProcessing[i] = true
continue
}
seq.numPredicted++
nextToken := &input.Input{Token: 0} // placeholder we'll fill in after Compute/Floats
seq.inputs = []*input.Input{nextToken}
nextBatchTokens[i] = nextToken
iBatches[i] = seq.iBatch
}
// At this point the seqs are ready for forwardBatch to move forward so unblock
s.mu.Unlock()
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: signaling computeStartedCh", "batchID", bs.id)
bs.computeStartedCh <- struct{}{}
bs.modelInput.BackendSetFromIntSlice(batchInputs)
bs.ctx.Compute(bs.modelOutput)
logits := bs.modelOutput.Floats()
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: logits ready", "batchID", bs.id)
s.mu.Lock()
defer s.mu.Unlock()
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: decoding", "batchID", bs.id)
for i, seq := range s.seqs {
if seq == nil {
continue
}
// Skip over any newly added sequences
if bs.seqs[i] == nil {
continue
}
// Detect if the sequence we're processing has already been completed and replaced
// with a new sequence
if seq != bs.seqs[i] {
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: sequence replaced, discarding its results", "batchID", bs.id, "seqIdx", i)
continue
}
// don't sample prompt processing
if promptProcessing[i] {
continue
}
if seq.numPredicted == 1 {
seq.startGenerationTime = time.Now()
}
@@ -522,35 +749,46 @@ func (s *Server) processBatch() error {
// if done processing the prompt, generate an embedding and return
if seq.embeddingOnly {
// TODO(jessegross): Embedding support
slog.Warn("generation of embedding outputs not yet supported")
s.removeSequence(i, llm.DoneReasonStop)
slog.Warn("generation of embedding outputs not yet supported", "id", bs.id, "seqIdx", i)
s.finishSequence(i, llm.DoneReasonStop)
continue
}
// sample a token
vocabSize := len(logits) / len(batch.Outputs)
token, err := seq.sampler.Sample(logits[seq.iBatch*vocabSize : (seq.iBatch+1)*vocabSize])
vocabSize := len(logits) / len(bs.batch.Outputs)
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: vocab details", "batchID", bs.id, "seqIdx", i, "len(logits)", len(logits), "len(bs.batch.Outputs)", len(bs.batch.Outputs), "vocabSize", vocabSize, "seq.iBatch", seq.iBatch)
token, err := seq.sampler.Sample(logits[iBatches[i]*vocabSize : (iBatches[i]+1)*vocabSize])
if err != nil {
return fmt.Errorf("failed to sample token: %w", err)
s.hardErrCh <- fmt.Errorf("failed to sample token: %w", err)
return
}
nextBatchTokens[i].Token = token
// if it's an end of sequence token, break
if s.model.(model.TextProcessor).Is(token, model.SpecialEOS) {
// TODO (jmorganca): we should send this back
// as it's important for the /api/generate context
// seq.responses <- piece
s.removeSequence(i, llm.DoneReasonStop)
slog.Log(context.TODO(), logutil.LevelTrace, "computeBatch: EOS", "batchID", bs.id, "seqIdx", i)
s.finishSequence(i, llm.DoneReasonStop)
continue
}
piece, err := s.model.(model.TextProcessor).Decode([]int32{token})
if err != nil {
return err
s.hardErrCh <- fmt.Errorf("failed to decode token: %w", err)
return
}
seq.inputs = []input.Input{{Token: token}}
if nextBatchTokens[i] == nil {
slog.Error("batch corrupted", "id", bs.id, "batch", bs.batch, "seqIdx", i, "seq", seq)
s.hardErrCh <- fmt.Errorf("expected a single token during decode")
return
}
// fill in the final selected token value to replace the placeholder in the next batch
// nextBatchTokensWritten++
seq.pendingResponses = append(seq.pendingResponses, piece)
sequence := strings.Join(seq.pendingResponses, "")
@@ -575,9 +813,10 @@ func (s *Server) processBatch() error {
if tokenTruncated || origLen == newLen {
tokenLen--
}
seq.cache.Inputs = seq.cache.Inputs[:tokenLen]
s.removeSequence(i, llm.DoneReasonStop)
s.finishSequence(i, llm.DoneReasonStop)
continue
}
@@ -590,11 +829,9 @@ func (s *Server) processBatch() error {
}
if !flushPending(seq) {
s.removeSequence(i, llm.DoneReasonConnectionClosed)
s.finishSequence(i, llm.DoneReasonConnectionClosed)
}
}
return nil
}
func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
@@ -604,6 +841,15 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
return
}
var harmonyMessageHandler *harmony.HarmonyMessageHandler
var harmonyToolParser *harmony.HarmonyToolCallAccumulator
if req.FunctionNameMap != nil {
harmonyMessageHandler = harmony.NewHarmonyMessageHandler()
harmonyMessageHandler.FunctionNameMap = req.FunctionNameMap
harmonyMessageHandler.HarmonyParser.AddImplicitStartOrPrefill(req.PrefillContent)
harmonyToolParser = harmonyMessageHandler.CreateToolParser()
}
if req.Options == nil {
opts := api.DefaultOptions()
req.Options = &opts
@@ -694,8 +940,16 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
return
case content, ok := <-seq.responses:
if ok {
var thinking string
if harmonyMessageHandler != nil {
var toolContent string
content, thinking, toolContent = harmonyMessageHandler.AddContent(content, harmonyToolParser)
harmonyToolParser.Add(toolContent)
}
if err := json.NewEncoder(w).Encode(&llm.CompletionResponse{
Content: content,
Content: content,
Thinking: thinking,
}); err != nil {
http.Error(w, fmt.Sprintf("failed to encode response: %v", err), http.StatusInternalServerError)
close(seq.quit)
@@ -704,7 +958,29 @@ func (s *Server) completion(w http.ResponseWriter, r *http.Request) {
flusher.Flush()
} else {
var toolCalls []api.ToolCall
if harmonyMessageHandler != nil {
toolName, toolContent := harmonyToolParser.Drain()
if toolName != nil {
*toolName = strings.TrimPrefix(*toolName, "functions.")
*toolName = harmonyMessageHandler.FunctionNameMap.OriginalFromConverted(*toolName)
var args api.ToolCallFunctionArguments
if err := json.Unmarshal([]byte(toolContent), &args); err != nil {
http.Error(w, fmt.Sprintf("failed to unmarshal tool call function arguments: %v", err), http.StatusInternalServerError)
close(seq.quit)
return
}
toolCalls = append(toolCalls, api.ToolCall{
Function: api.ToolCallFunction{
Name: *toolName,
Arguments: args,
},
})
}
}
if err := json.NewEncoder(w).Encode(&llm.CompletionResponse{
ToolCalls: toolCalls,
Done: true,
DoneReason: seq.doneReason,
PromptEvalCount: seq.numPromptInputs,
@@ -736,7 +1012,10 @@ func (s *Server) reserveWorstCaseGraph() error {
defer ctx.Close()
var err error
inputs := make([]input.Input, s.batchSize)
inputs := make([]*input.Input, s.batchSize)
for i := range inputs {
inputs[i] = &input.Input{}
}
mmStore := newMultimodalStore()
// Multimodal strategy:
@@ -778,8 +1057,11 @@ func (s *Server) reserveWorstCaseGraph() error {
}
if len(inputs) < s.batchSize {
newInputs := make([]input.Input, s.batchSize)
newInputs := make([]*input.Input, s.batchSize)
copy(newInputs, inputs)
for i := len(inputs); i < s.batchSize; i++ {
newInputs[i] = &input.Input{}
}
inputs = newInputs
}
}
@@ -842,6 +1124,7 @@ func (s *Server) allocModel(
// Convert memory allocation panics to errors
defer func() {
if r := recover(); r != nil {
debug.PrintStack()
if err, ok := r.(error); ok {
panicErr = err
} else {
@@ -1011,6 +1294,7 @@ func Execute(args []string) error {
server := &Server{
modelPath: *mpath,
status: llm.ServerStatusLaunched,
hardErrCh: make(chan error, 1),
}
server.cond = sync.NewCond(&server.mu)

View File

@@ -46,18 +46,6 @@ import (
"github.com/ollama/ollama/version"
)
func shouldUseHarmony(model *Model) bool {
if slices.Contains([]string{"gptoss", "gpt-oss"}, model.Config.ModelFamily) {
// heuristic to check whether the template expects to be parsed via harmony:
// search for harmony tags that are nearly always used
if model.Template.Contains("<|start|>") && model.Template.Contains("<|end|>") {
return true
}
}
return false
}
func experimentEnabled(name string) bool {
return slices.Contains(strings.Split(os.Getenv("OLLAMA_EXPERIMENT"), ","), name)
}
@@ -207,14 +195,7 @@ func (s *Server) GenerateHandler(c *gin.Context) {
return
}
useHarmony := shouldUseHarmony(m) && !req.Raw
var harmonyMessageHandler *harmony.HarmonyMessageHandler
var harmonyToolParser *harmony.HarmonyToolCallAccumulator
if useHarmony {
harmonyMessageHandler = harmony.NewHarmonyMessageHandler()
harmonyMessageHandler.HarmonyParser.AddImplicitStart()
harmonyToolParser = harmonyMessageHandler.CreateToolParser()
}
useHarmony := harmony.ShouldUseHarmony(m.Config.ModelFamily, m.Template) && !req.Raw
// Validate Think value: string values currently only allowed for gptoss models
if req.Think != nil && req.Think.IsString() && !useHarmony {
@@ -375,12 +356,7 @@ func (s *Server) GenerateHandler(c *gin.Context) {
},
}
if useHarmony {
content, thinking, toolContent := harmonyMessageHandler.AddContent(cr.Content, harmonyToolParser)
res.Response = content
res.Thinking = thinking
harmonyToolParser.Add(toolContent)
} else if thinkingState != nil {
if !useHarmony && thinkingState != nil {
thinking, content := thinkingState.AddContent(cr.Content)
res.Thinking = thinking
res.Response = content
@@ -391,26 +367,6 @@ func (s *Server) GenerateHandler(c *gin.Context) {
}
if cr.Done {
if useHarmony {
toolName, toolContent := harmonyToolParser.Drain()
if toolName != nil {
*toolName = strings.TrimPrefix(*toolName, "functions.")
var args api.ToolCallFunctionArguments
if err := json.Unmarshal([]byte(toolContent), &args); err != nil {
errStr := fmt.Sprintf("error parsing tool call: raw='%s', err=%s", toolContent, err.Error())
ch <- gin.H{"error": errStr}
return
}
res.ToolCalls = append(res.ToolCalls, api.ToolCall{
Function: api.ToolCallFunction{
Name: *toolName,
Arguments: args,
},
})
}
}
res.DoneReason = cr.DoneReason.String()
res.TotalDuration = time.Since(checkpointStart)
res.LoadDuration = checkpointLoaded.Sub(checkpointStart)
@@ -1616,27 +1572,36 @@ func (s *Server) ChatHandler(c *gin.Context) {
}
msgs = filterThinkTags(msgs, m)
var harmonyMessageHandler *harmony.HarmonyMessageHandler
var harmonyToolParser *harmony.HarmonyToolCallAccumulator
useHarmony := shouldUseHarmony(m)
useHarmony := harmony.ShouldUseHarmony(m.Config.ModelFamily, m.Template)
processedTools := req.Tools
var functionNameMap *harmony.FunctionNameMap
var prefillContentOrThinking *bool
if useHarmony {
harmonyMessageHandler = harmony.NewHarmonyMessageHandler()
functionNameMap = harmony.NewFunctionNameMap()
var lastMessage *api.Message
if len(msgs) > 0 {
lastMessage = &msgs[len(msgs)-1]
}
harmonyMessageHandler.HarmonyParser.AddImplicitStartOrPrefill(lastMessage)
harmonyToolParser = harmonyMessageHandler.CreateToolParser()
// prefill content or thinking flag if the last message is an assistant message
if lastMessage != nil && lastMessage.Role == "assistant" {
if lastMessage.Content != "" {
trueVal := true
// true sets content to be prefilled
prefillContentOrThinking = &trueVal
} else if lastMessage.Thinking != "" {
// false sets thinking to be prefilled
falseVal := false
prefillContentOrThinking = &falseVal
}
}
// make a copy of tools to pass to the chat prompt. Function names may be
// renamed to be valid Harmony function names.
processedTools = make([]api.Tool, len(req.Tools))
copy(processedTools, req.Tools)
for i, tool := range processedTools {
processedTools[i].Function.Name = harmonyMessageHandler.FunctionNameMap.ConvertAndAdd(tool.Function.Name)
processedTools[i].Function.Name = functionNameMap.ConvertAndAdd(tool.Function.Name)
}
}
@@ -1673,10 +1638,6 @@ func (s *Server) ChatHandler(c *gin.Context) {
OpeningTag: openingTag,
ClosingTag: closingTag,
}
if strings.HasSuffix(strings.TrimSpace(prompt), openingTag) {
thinkingState.AddContent(openingTag)
}
}
var toolParser *tools.Parser
@@ -1689,15 +1650,17 @@ func (s *Server) ChatHandler(c *gin.Context) {
defer close(ch)
if err := r.Completion(c.Request.Context(), llm.CompletionRequest{
Prompt: prompt,
Images: images,
Format: req.Format,
Options: opts,
Prompt: prompt,
Images: images,
Format: req.Format,
Options: opts,
FunctionNameMap: functionNameMap,
PrefillContent: prefillContentOrThinking,
}, func(r llm.CompletionResponse) {
res := api.ChatResponse{
Model: req.Model,
CreatedAt: time.Now().UTC(),
Message: api.Message{Role: "assistant", Content: r.Content},
Message: api.Message{Role: "assistant", Content: r.Content, Thinking: r.Thinking, ToolCalls: r.ToolCalls},
Done: r.Done,
Metrics: api.Metrics{
PromptEvalCount: r.PromptEvalCount,
@@ -1713,31 +1676,10 @@ func (s *Server) ChatHandler(c *gin.Context) {
}
if useHarmony {
content, thinking, toolContent := harmonyMessageHandler.AddContent(r.Content, harmonyToolParser)
res.Message.Content = content
res.Message.Thinking = thinking
harmonyToolParser.Add(toolContent)
if r.Done {
toolName, toolContent := harmonyToolParser.Drain()
if toolName != nil {
*toolName = strings.TrimPrefix(*toolName, "functions.")
*toolName = harmonyMessageHandler.FunctionNameMap.OriginalFromConverted(*toolName)
var args api.ToolCallFunctionArguments
if err := json.Unmarshal([]byte(toolContent), &args); err != nil {
errStr := fmt.Sprintf("error parsing tool call: raw='%s', err=%s", toolContent, err.Error())
ch <- gin.H{"error": errStr}
return
}
res.Message.ToolCalls = []api.ToolCall{{Function: api.ToolCallFunction{Name: *toolName, Arguments: args}}}
}
}
// only send messages with meaningful content (empty messages confuse clients)
if res.Message.Content != "" || res.Message.Thinking != "" || len(res.Message.ToolCalls) > 0 || res.Done {
ch <- res
}
return
}

View File

@@ -969,233 +969,3 @@ func TestGenerate(t *testing.T) {
}
})
}
func TestChatWithPromptEndingInThinkTag(t *testing.T) {
gin.SetMode(gin.TestMode)
// Helper to create a standard thinking test setup
setupThinkingTest := func(t *testing.T) (*mockRunner, *Server) {
mock := &mockRunner{
CompletionResponse: llm.CompletionResponse{
Done: true,
DoneReason: llm.DoneReasonStop,
PromptEvalCount: 1,
PromptEvalDuration: 1,
EvalCount: 1,
EvalDuration: 1,
},
}
s := &Server{
sched: &Scheduler{
pendingReqCh: make(chan *LlmRequest, 1),
finishedReqCh: make(chan *LlmRequest, 1),
expiredCh: make(chan *runnerRef, 1),
unloadedCh: make(chan any, 1),
loaded: make(map[string]*runnerRef),
newServerFn: newMockServer(mock),
getGpuFn: discover.GetGPUInfo,
getCpuFn: discover.GetCPUInfo,
reschedDelay: 250 * time.Millisecond,
loadFn: func(req *LlmRequest, _ *ggml.GGML, _ discover.GpuInfoList, _ bool) bool {
time.Sleep(time.Millisecond)
req.successCh <- &runnerRef{llama: mock}
return false
},
},
}
go s.sched.Run(t.Context())
// Create a model with thinking support
_, digest := createBinFile(t, ggml.KV{
"general.architecture": "llama",
"llama.block_count": uint32(1),
"llama.context_length": uint32(8192),
"llama.embedding_length": uint32(4096),
"llama.attention.head_count": uint32(32),
"llama.attention.head_count_kv": uint32(8),
"tokenizer.ggml.tokens": []string{""},
"tokenizer.ggml.scores": []float32{0},
"tokenizer.ggml.token_type": []int32{0},
}, []*ggml.Tensor{
{Name: "token_embd.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_norm.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.ffn_down.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.ffn_gate.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.ffn_up.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.ffn_norm.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_k.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_output.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_q.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "blk.0.attn_v.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
{Name: "output.weight", Shape: []uint64{1}, WriterTo: bytes.NewReader(make([]byte, 4))},
})
// Create model with thinking template that adds <think> at the end
w := createRequest(t, s.CreateHandler, api.CreateRequest{
Model: "test-thinking",
Files: map[string]string{"file.gguf": digest},
Template: `{{- range .Messages }}
{{- if eq .Role "user" }}user: {{ .Content }}
{{ else if eq .Role "assistant" }}assistant: {{ if .Thinking }}<think>{{ .Thinking }}</think>{{ end }}{{ .Content }}
{{ end }}{{ end }}<think>`,
Stream: &stream,
})
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d", w.Code)
}
return mock, s
}
mock, s := setupThinkingTest(t)
// Helper to test chat responses
testChatRequest := func(t *testing.T, name string, userContent string, modelResponse string, expectedThinking string, expectedContent string, think bool) {
t.Run(name, func(t *testing.T) {
mock.CompletionResponse = llm.CompletionResponse{
Content: modelResponse,
Done: true,
DoneReason: llm.DoneReasonStop,
PromptEvalCount: 1,
PromptEvalDuration: 1,
EvalCount: 1,
EvalDuration: 1,
}
mock.CompletionFn = nil
streamRequest := false
req := api.ChatRequest{
Model: "test-thinking",
Messages: []api.Message{
{Role: "user", Content: userContent},
},
Stream: &streamRequest,
}
if think {
req.Think = &api.ThinkValue{Value: think}
}
w := createRequest(t, s.ChatHandler, req)
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d", w.Code)
}
var resp api.ChatResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatal(err)
}
if resp.Message.Thinking != expectedThinking {
t.Errorf("expected thinking %q, got %q", expectedThinking, resp.Message.Thinking)
}
if resp.Message.Content != expectedContent {
t.Errorf("expected content %q, got %q", expectedContent, resp.Message.Content)
}
})
}
// Test cases - Note: Template adds <think> at the end, and leading whitespace after <think> is eaten by the parser
testChatRequest(t, "basic thinking response",
"Help me solve this problem",
" Let me think about this step by step... </think> The answer is 42.",
"Let me think about this step by step... ",
"The answer is 42.",
true)
testChatRequest(t, "thinking with multiple sentences",
"Explain quantum computing",
" First, I need to understand the basics. Quantum bits can be in superposition. </think> Quantum computing uses quantum mechanics principles.",
"First, I need to understand the basics. Quantum bits can be in superposition. ",
"Quantum computing uses quantum mechanics principles.",
true)
testChatRequest(t, "no thinking content",
"What is 2+2?",
"</think> The answer is 4.",
"",
"The answer is 4.",
true)
testChatRequest(t, "thinking disabled but template still adds think tag",
"Simple question",
" My thoughts </think> The answer.",
"",
" My thoughts </think> The answer.",
false)
// Test streaming response with template-added <think>
t.Run("streaming with thinking", func(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
mock.CompletionFn = func(ctx context.Context, r llm.CompletionRequest, fn func(r llm.CompletionResponse)) error {
defer wg.Done()
// Verify the prompt ends with <think> due to template
if !strings.HasSuffix(r.Prompt, "<think>") {
t.Errorf("expected prompt to end with <think>, got: %q", r.Prompt)
}
// Simulate streaming chunks
responses := []llm.CompletionResponse{
{Content: " I need to consider", Done: false, PromptEvalCount: 1, PromptEvalDuration: 1},
{Content: " multiple factors here...", Done: false, PromptEvalCount: 1, PromptEvalDuration: 1},
{Content: " </think> Based on my analysis,", Done: false, PromptEvalCount: 1, PromptEvalDuration: 1},
{Content: " the solution is straightforward.", Done: true, DoneReason: llm.DoneReasonStop, PromptEvalCount: 1, PromptEvalDuration: 1, EvalCount: 1, EvalDuration: 1},
}
for _, resp := range responses {
select {
case <-ctx.Done():
return ctx.Err()
default:
fn(resp)
time.Sleep(10 * time.Millisecond)
}
}
return nil
}
think := true
w := createRequest(t, s.ChatHandler, api.ChatRequest{
Model: "test-thinking",
Messages: []api.Message{{Role: "user", Content: "Analyze this complex problem"}},
Think: &api.ThinkValue{Value: think},
Stream: &stream,
})
wg.Wait()
if w.Code != http.StatusOK {
t.Fatalf("expected status 200, got %d", w.Code)
}
// Parse streaming responses
decoder := json.NewDecoder(w.Body)
var allThinking, allContent strings.Builder
for {
var resp api.ChatResponse
if err := decoder.Decode(&resp); err == io.EOF {
break
} else if err != nil {
t.Fatal(err)
}
allThinking.WriteString(resp.Message.Thinking)
allContent.WriteString(resp.Message.Content)
}
// Note: Leading whitespace after <think> is eaten by the parser
if got := allThinking.String(); got != "I need to consider multiple factors here... " {
t.Errorf("expected thinking %q, got %q", "I need to consider multiple factors here... ", got)
}
if got := allContent.String(); got != "Based on my analysis, the solution is straightforward." {
t.Errorf("expected content %q, got %q", "Based on my analysis, the solution is straightforward.", got)
}
})
}

View File

@@ -103,9 +103,7 @@ func eat(s *Parser) (string, string, bool) {
// note that we use the original content, not the trimmed one because we
// don't want to eat any whitespace in the real content if there were no
// thinking tags
untrimmed := s.acc.String()
s.acc.Reset()
return "", untrimmed, false
return "", s.acc.String(), false
}
case thinkingState_ThinkingStartedEatingWhitespace:
trimmed := strings.TrimLeftFunc(s.acc.String(), unicode.IsSpace)

View File

@@ -58,15 +58,6 @@ func TestThinkingStreaming(t *testing.T) {
wantContent: " abc",
wantStateAfter: thinkingState_ThinkingDone,
},
// regression test for a bug where we were transitioning directly to
// ThinkingDone without clearing the buffer. This would cuase the first
// step to be outputted twice
{
input: "def",
wantThinking: "",
wantContent: "def",
wantStateAfter: thinkingState_ThinkingDone,
},
},
},
{

View File

@@ -224,45 +224,22 @@ func findArguments(buffer []byte) (map[string]any, int) {
return nil, 0
}
start := -1
var braces int
var inString, escaped bool
for i := range buffer {
c := buffer[i]
if escaped {
escaped = false
continue
}
if c == '\\' {
escaped = true
continue
}
if c == '"' {
inString = !inString
continue
}
if inString {
continue
}
var start int = -1
for i, c := range buffer {
if c == '{' {
if braces == 0 {
start = i
}
braces++
} else if c == '}' {
} else if c == '}' && braces > 0 {
braces--
if braces == 0 && start != -1 {
object := buffer[start : i+1]
var data map[string]any
if err := json.Unmarshal(object, &data); err != nil {
// not a valid object, keep looking
start = -1
continue
}
@@ -305,10 +282,6 @@ func findArguments(buffer []byte) (map[string]any, int) {
return data, i
}
if braces < 0 {
braces = 0
}
}
}

View File

@@ -1,7 +1,6 @@
package tools
import (
"strings"
"testing"
"text/template"
@@ -41,7 +40,13 @@ func TestParser(t *testing.T) {
Function: api.ToolFunction{
Name: "get_temperature",
Description: "Retrieve the temperature for a given location",
Parameters: api.ToolFunctionParameters{
Parameters: struct {
Type string `json:"type"`
Defs any `json:"$defs,omitempty"`
Items any `json:"items,omitempty"`
Required []string `json:"required"`
Properties map[string]api.ToolProperty `json:"properties"`
}{
Type: "object",
Required: []string{"city"},
Properties: map[string]api.ToolProperty{
@@ -63,7 +68,13 @@ func TestParser(t *testing.T) {
Function: api.ToolFunction{
Name: "get_conditions",
Description: "Retrieve the current weather conditions for a given location",
Parameters: api.ToolFunctionParameters{
Parameters: struct {
Type string `json:"type"`
Defs any `json:"$defs,omitempty"`
Items any `json:"items,omitempty"`
Required []string `json:"required"`
Properties map[string]api.ToolProperty `json:"properties"`
}{
Type: "object",
Properties: map[string]api.ToolProperty{
"location": {
@@ -93,7 +104,13 @@ func TestParser(t *testing.T) {
Function: api.ToolFunction{
Name: "get_address",
Description: "Get the address of a given location",
Parameters: api.ToolFunctionParameters{
Parameters: struct {
Type string `json:"type"`
Defs any `json:"$defs,omitempty"`
Items any `json:"items,omitempty"`
Required []string `json:"required"`
Properties map[string]api.ToolProperty `json:"properties"`
}{
Type: "object",
Properties: map[string]api.ToolProperty{
"location": {
@@ -109,7 +126,13 @@ func TestParser(t *testing.T) {
Function: api.ToolFunction{
Name: "add",
Description: "Add two numbers",
Parameters: api.ToolFunctionParameters{
Parameters: struct {
Type string `json:"type"`
Defs any `json:"$defs,omitempty"`
Items any `json:"items,omitempty"`
Required []string `json:"required"`
Properties map[string]api.ToolProperty `json:"properties"`
}{
Type: "object",
Properties: map[string]api.ToolProperty{
"a": {
@@ -1117,163 +1140,11 @@ func TestFindArguments(t *testing.T) {
},
{
name: "deepseek",
buffer: []byte(`"arguments": {"location": "Tokyo"}}</tool_call>`),
buffer: []byte(`", "arguments": {"location": "Tokyo"}}</tool_call>`),
want: map[string]any{
"location": "Tokyo",
},
},
{
name: "string with braces",
buffer: []byte(`{"name": "process_code", "arguments": {"code": "if (x > 0) { return true; }"}}`),
want: map[string]any{
"code": "if (x > 0) { return true; }",
},
},
{
name: "string with nested json",
buffer: []byte(`{"name": "send_data", "arguments": {"payload": "{\"nested\": {\"key\": \"value\"}}"}}`),
want: map[string]any{
"payload": `{"nested": {"key": "value"}}`,
},
},
{
name: "string with escaped quotes and braces",
buffer: []byte(`{"name": "analyze", "arguments": {"text": "The JSON is: {\"key\": \"val{ue}\"}"}}`),
want: map[string]any{
"text": `The JSON is: {"key": "val{ue}"}`,
},
},
{
name: "multiple objects with string containing braces",
buffer: []byte(`{"name": "test", "arguments": {"query": "find } in text"}} {"name": "other"}`),
want: map[string]any{
"query": "find } in text",
},
},
{
name: "unmatched closing brace in string",
buffer: []byte(`{"name": "search", "arguments": {"pattern": "regex: }"}}`),
want: map[string]any{
"pattern": "regex: }",
},
},
{
name: "complex nested with mixed braces",
buffer: []byte(`{"name": "analyze", "arguments": {"data": "{\"items\": [{\"value\": \"}\"}, {\"code\": \"if (x) { return y; }\"}]}"}}`),
want: map[string]any{
"data": `{"items": [{"value": "}"}, {"code": "if (x) { return y; }"}]}`,
},
},
{
name: "string with newline and braces",
buffer: []byte(`{"name": "format", "arguments": {"template": "{\n \"key\": \"value\"\n}"}}`),
want: map[string]any{
"template": "{\n \"key\": \"value\"\n}",
},
},
{
name: "string with unicode escape",
buffer: []byte(`{"name": "test", "arguments": {"text": "Unicode: \u007B and \u007D"}}`),
want: map[string]any{
"text": "Unicode: { and }",
},
},
{
name: "array arguments",
buffer: []byte(`{"name": "batch", "arguments": ["item1", "item2", "{\"nested\": true}"]}`),
want: nil, // This should return nil because arguments is not a map
},
{
name: "escaped backslash before quote",
buffer: []byte(`{"name": "path", "arguments": {"dir": "C:\\Program Files\\{App}\\"}}`),
want: map[string]any{
"dir": `C:\Program Files\{App}\`,
},
},
{
name: "single quotes not treated as string delimiters",
buffer: []byte(`{"name": "query", "arguments": {"sql": "SELECT * FROM users WHERE name = '{admin}'"}}`),
want: map[string]any{
"sql": "SELECT * FROM users WHERE name = '{admin}'",
},
},
{
name: "incomplete json at buffer end",
buffer: []byte(`{"name": "test", "arguments": {"data": "some {"`),
want: nil,
},
{
name: "multiple escaped quotes",
buffer: []byte(`{"name": "echo", "arguments": {"msg": "He said \"Hello {World}\" loudly"}}`),
want: map[string]any{
"msg": `He said "Hello {World}" loudly`,
},
},
{
name: "json with comments style string",
buffer: []byte(`{"name": "code", "arguments": {"snippet": "// This is a comment with { and }"}}`),
want: map[string]any{
"snippet": "// This is a comment with { and }",
},
},
{
name: "consecutive escaped backslashes",
buffer: []byte(`{"name": "test", "arguments": {"path": "C:\\\\{folder}\\\\"}}`),
want: map[string]any{
"path": `C:\\{folder}\\`,
},
},
{
name: "empty string with braces after",
buffer: []byte(`{"name": "test", "arguments": {"a": "", "b": "{value}"}}`),
want: map[string]any{
"a": "",
"b": "{value}",
},
},
{
name: "unicode in key names",
buffer: []byte(`{"name": "test", "arguments": {"key{": "value", "key}": "value2"}}`),
want: map[string]any{
"key{": "value",
"key}": "value2",
},
},
{
name: "very long string with braces",
buffer: []byte(`{"name": "test", "arguments": {"data": "` + strings.Repeat("a{b}c", 100) + `"}}`),
want: map[string]any{
"data": strings.Repeat("a{b}c", 100),
},
},
{
name: "tab characters and braces",
buffer: []byte(`{"name": "test", "arguments": {"code": "\tif (true) {\n\t\treturn;\n\t}"}}`),
want: map[string]any{
"code": "\tif (true) {\n\t\treturn;\n\t}",
},
},
{
name: "null byte in string",
buffer: []byte(`{"name": "test", "arguments": {"data": "before\u0000{after}"}}`),
want: map[string]any{
"data": "before\x00{after}",
},
},
{
name: "escaped quote at end of string",
buffer: []byte(`{"name": "test", "arguments": {"data": "text with quote at end\\\""}}`),
want: map[string]any{
"data": `text with quote at end\"`,
},
},
{
name: "mixed array and object in arguments",
buffer: []byte(`{"name": "test", "arguments": {"items": ["{", "}", {"key": "value"}]}}`),
want: map[string]any{
"items": []any{"{", "}", map[string]any{"key": "value"}},
},
},
}
for _, tt := range tests {