revert unwanted changes

This commit is contained in:
jmorganca 2025-12-20 22:00:35 -08:00
parent 2aee6c172b
commit 9a8c2a4635
3 changed files with 31 additions and 44 deletions

View File

@ -15,6 +15,7 @@ import (
"net/url"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
@ -59,16 +60,9 @@ func (s *speedTracker) Median() float64 {
if len(s.speeds) < 3 {
return 0 // not enough data
}
// Simple median: sort a copy and take middle
sorted := make([]float64, len(s.speeds))
copy(sorted, s.speeds)
for i := range sorted {
for j := i + 1; j < len(sorted); j++ {
if sorted[j] < sorted[i] {
sorted[i], sorted[j] = sorted[j], sorted[i]
}
}
}
sorted := slices.Clone(s.speeds)
slices.Sort(sorted)
return sorted[len(sorted)/2]
}
@ -183,7 +177,7 @@ func (h *streamHasher) MarkComplete(partIndex int) {
h.mu.Unlock()
}
// Run reads and hashes the file sequentially. Call in a goroutine.
// Run reads and hashes the file sequentially
func (h *streamHasher) Run() {
buf := make([]byte, 64*1024) // 64KB read buffer
var offset int64
@ -399,47 +393,18 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
return err
}
// Download chunks to disk, hash by reading from page cache.
// Memory: ~64KB (hasher read buffer only), regardless of concurrency.
// Download chunks to disk, hash sequentially
// The hasher follows behind the downloaders, reading recently-written
// data from OS page cache (RAM) rather than disk.
sh := newStreamHasher(file, b.Parts, b.Total)
tracker := &speedTracker{}
// Start hasher goroutine
hashDone := make(chan struct{})
go func() {
sh.Run()
close(hashDone)
}()
// Log progress periodically
// Page cache warning: if spread > 4GB, hasher may hit disk instead of RAM
const pageCacheWarningBytes = 4 << 30 // 4GB
progressDone := make(chan struct{})
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
downloaded := b.Completed.Load()
hashed := sh.Hashed()
dlPct := int(downloaded * 100 / b.Total)
hPct := int(hashed * 100 / b.Total)
spread := dlPct - hPct
spreadBytes := downloaded - hashed
slog.Debug(fmt.Sprintf("progress: downloaded %d%% | hashed %d%% | spread %d%%", dlPct, hPct, spread))
if spreadBytes > pageCacheWarningBytes {
slog.Debug("page cache pressure", "ahead", fmt.Sprintf("%.1fGB", float64(spreadBytes)/(1<<30)))
}
case <-progressDone:
return
}
}
}()
g, inner := errgroup.WithContext(ctx)
g.SetLimit(downloadConcurrency)
for i := range b.Parts {
@ -482,14 +447,12 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
}
if err := g.Wait(); err != nil {
close(progressDone)
sh.Stop()
return err
}
// Wait for hasher to finish
<-hashDone
close(progressDone)
if err := sh.Err(); err != nil {
return err
}
@ -518,7 +481,6 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *regis
}
// downloadChunkToDisk streams a part directly to disk at its offset.
// Memory: ~32KB (read buffer only).
// If skipSlowCheck is true, don't flag slow parts (used after repeated slow retries).
func (b *blobDownload) downloadChunkToDisk(ctx context.Context, requestURL *url.URL, file *os.File, part *blobDownloadPart, tracker *speedTracker, skipSlowCheck bool) error {
g, ctx := errgroup.WithContext(ctx)

8
server/sparse_common.go Normal file
View File

@ -0,0 +1,8 @@
//go:build !windows
package server
import "os"
func setSparse(*os.File) {
}

17
server/sparse_windows.go Normal file
View File

@ -0,0 +1,17 @@
package server
import (
"os"
"golang.org/x/sys/windows"
)
func setSparse(file *os.File) {
// exFat (and other FS types) don't support sparse files, so ignore errors
windows.DeviceIoControl( //nolint:errcheck
windows.Handle(file.Fd()), windows.FSCTL_SET_SPARSE,
nil, 0,
nil, 0,
nil, nil,
)
}