diff --git a/server/download.go b/server/download.go index 68836a824..5b02991cf 100644 --- a/server/download.go +++ b/server/download.go @@ -132,16 +132,8 @@ func (p *blobDownloadPart) UnmarshalJSON(b []byte) error { return nil } -// Download tuning. Override via environment variables: -// - Memory constrained: reduce OLLAMA_DOWNLOAD_CONCURRENCY -// - Default uses ~64KB memory regardless of concurrency (streams to disk, hashes from page cache) var ( - // downloadPartSize is the size of each download part. - // Default: 64MB. Override with OLLAMA_DOWNLOAD_PART_SIZE (in MB). - downloadPartSize = int64(getEnvInt("OLLAMA_DOWNLOAD_PART_SIZE", 64)) * format.MegaByte - - // downloadConcurrency limits concurrent part downloads. - // Default: 32. Override with OLLAMA_DOWNLOAD_CONCURRENCY. + downloadPartSize = int64(getEnvInt("OLLAMA_DOWNLOAD_PART_SIZE", 64)) * format.MegaByte downloadConcurrency = getEnvInt("OLLAMA_DOWNLOAD_CONCURRENCY", 32) ) @@ -421,6 +413,34 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis close(hashDone) }() + // Log progress periodically + // Page cache warning: if spread > 4GB, hasher may hit disk instead of RAM + const pageCacheWarningBytes = 4 << 30 // 4GB + progressDone := make(chan struct{}) + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + downloaded := b.Completed.Load() + hashed := sh.Hashed() + dlPct := int(downloaded * 100 / b.Total) + hPct := int(hashed * 100 / b.Total) + spread := dlPct - hPct + spreadBytes := downloaded - hashed + + msg := fmt.Sprintf("progress: downloaded %d%% | hashed %d%% | spread %d%%", dlPct, hPct, spread) + if spreadBytes > pageCacheWarningBytes { + msg += fmt.Sprintf(" [WARNING: %.1fGB ahead, page cache pressure]", float64(spreadBytes)/(1<<30)) + } + slog.Info(msg) + case <-progressDone: + return + } + } + }() + g, inner := errgroup.WithContext(ctx) g.SetLimit(downloadConcurrency) for i := range b.Parts { @@ -462,34 +482,6 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis }) } - // Log progress periodically - // Page cache warning: if spread > 1GB, hasher may hit disk instead of RAM - const pageCacheWarningBytes = 1 << 30 // 1GB - progressDone := make(chan struct{}) - go func() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-ticker.C: - downloaded := b.Completed.Load() - hashed := sh.Hashed() - dlPct := int(downloaded * 100 / b.Total) - hPct := int(hashed * 100 / b.Total) - spread := dlPct - hPct - spreadBytes := downloaded - hashed - - msg := fmt.Sprintf("progress: downloaded %d%% | hashed %d%% | spread %d%%", dlPct, hPct, spread) - if spreadBytes > pageCacheWarningBytes { - msg += fmt.Sprintf(" [WARNING: %.1fGB ahead, page cache pressure]", float64(spreadBytes)/(1<<30)) - } - slog.Info(msg) - case <-progressDone: - return - } - } - }() - if err := g.Wait(); err != nil { close(progressDone) sh.Stop()