better logging
This commit is contained in:
parent
12ff2d1461
commit
49393385ca
|
|
@ -132,16 +132,8 @@ func (p *blobDownloadPart) UnmarshalJSON(b []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Download tuning. Override via environment variables:
|
||||
// - Memory constrained: reduce OLLAMA_DOWNLOAD_CONCURRENCY
|
||||
// - Default uses ~64KB memory regardless of concurrency (streams to disk, hashes from page cache)
|
||||
var (
|
||||
// downloadPartSize is the size of each download part.
|
||||
// Default: 64MB. Override with OLLAMA_DOWNLOAD_PART_SIZE (in MB).
|
||||
downloadPartSize = int64(getEnvInt("OLLAMA_DOWNLOAD_PART_SIZE", 64)) * format.MegaByte
|
||||
|
||||
// downloadConcurrency limits concurrent part downloads.
|
||||
// Default: 32. Override with OLLAMA_DOWNLOAD_CONCURRENCY.
|
||||
downloadConcurrency = getEnvInt("OLLAMA_DOWNLOAD_CONCURRENCY", 32)
|
||||
)
|
||||
|
||||
|
|
@ -421,6 +413,34 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
|
|||
close(hashDone)
|
||||
}()
|
||||
|
||||
// Log progress periodically
|
||||
// Page cache warning: if spread > 4GB, hasher may hit disk instead of RAM
|
||||
const pageCacheWarningBytes = 4 << 30 // 4GB
|
||||
progressDone := make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
downloaded := b.Completed.Load()
|
||||
hashed := sh.Hashed()
|
||||
dlPct := int(downloaded * 100 / b.Total)
|
||||
hPct := int(hashed * 100 / b.Total)
|
||||
spread := dlPct - hPct
|
||||
spreadBytes := downloaded - hashed
|
||||
|
||||
msg := fmt.Sprintf("progress: downloaded %d%% | hashed %d%% | spread %d%%", dlPct, hPct, spread)
|
||||
if spreadBytes > pageCacheWarningBytes {
|
||||
msg += fmt.Sprintf(" [WARNING: %.1fGB ahead, page cache pressure]", float64(spreadBytes)/(1<<30))
|
||||
}
|
||||
slog.Info(msg)
|
||||
case <-progressDone:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
g, inner := errgroup.WithContext(ctx)
|
||||
g.SetLimit(downloadConcurrency)
|
||||
for i := range b.Parts {
|
||||
|
|
@ -462,34 +482,6 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
|
|||
})
|
||||
}
|
||||
|
||||
// Log progress periodically
|
||||
// Page cache warning: if spread > 1GB, hasher may hit disk instead of RAM
|
||||
const pageCacheWarningBytes = 1 << 30 // 1GB
|
||||
progressDone := make(chan struct{})
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
downloaded := b.Completed.Load()
|
||||
hashed := sh.Hashed()
|
||||
dlPct := int(downloaded * 100 / b.Total)
|
||||
hPct := int(hashed * 100 / b.Total)
|
||||
spread := dlPct - hPct
|
||||
spreadBytes := downloaded - hashed
|
||||
|
||||
msg := fmt.Sprintf("progress: downloaded %d%% | hashed %d%% | spread %d%%", dlPct, hPct, spread)
|
||||
if spreadBytes > pageCacheWarningBytes {
|
||||
msg += fmt.Sprintf(" [WARNING: %.1fGB ahead, page cache pressure]", float64(spreadBytes)/(1<<30))
|
||||
}
|
||||
slog.Info(msg)
|
||||
case <-progressDone:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
close(progressDone)
|
||||
sh.Stop()
|
||||
|
|
|
|||
Loading…
Reference in New Issue