diff --git a/server/download_test.go b/server/download_test.go index cd54b7e84..d45e1113a 100644 --- a/server/download_test.go +++ b/server/download_test.go @@ -1,126 +1,304 @@ package server import ( - "bytes" "crypto/rand" "crypto/sha256" "fmt" - "io" + "os" + "sync" "testing" ) -func TestOrderedWriter_InOrder(t *testing.T) { - var buf bytes.Buffer - hasher := sha256.New() - ow := newOrderedWriter(&buf, hasher) +func TestSpeedTracker_Median(t *testing.T) { + s := &speedTracker{} - // Submit parts in order - for i := 0; i < 5; i++ { - data := []byte{byte(i), byte(i), byte(i)} - if err := ow.Submit(i, data); err != nil { - t.Fatalf("Submit(%d) failed: %v", i, err) - } + // Less than 3 samples returns 0 + s.Record(100) + s.Record(200) + if got := s.Median(); got != 0 { + t.Errorf("expected 0 with < 3 samples, got %f", got) } - // Verify output - expected := []byte{0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4} - if !bytes.Equal(buf.Bytes(), expected) { - t.Errorf("got %v, want %v", buf.Bytes(), expected) + // With 3+ samples, returns median + s.Record(300) + // Samples: [100, 200, 300] -> median = 200 + if got := s.Median(); got != 200 { + t.Errorf("expected median 200, got %f", got) + } + + // Add more samples + s.Record(50) + s.Record(250) + // Samples: [100, 200, 300, 50, 250] sorted = [50, 100, 200, 250, 300] -> median = 200 + if got := s.Median(); got != 200 { + t.Errorf("expected median 200, got %f", got) } } -func TestOrderedWriter_OutOfOrder(t *testing.T) { - var buf bytes.Buffer - hasher := sha256.New() - ow := newOrderedWriter(&buf, hasher) +func TestSpeedTracker_RollingWindow(t *testing.T) { + s := &speedTracker{} - // Submit parts out of order: 2, 4, 1, 0, 3 - order := []int{2, 4, 1, 0, 3} - for _, i := range order { - data := []byte{byte(i), byte(i), byte(i)} - if err := ow.Submit(i, data); err != nil { - t.Fatalf("Submit(%d) failed: %v", i, err) - } + // Add 105 samples (should keep only last 100) + for i := 0; i < 105; i++ { + s.Record(float64(i)) } - // Verify output is still in correct order - expected := []byte{0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4} - if !bytes.Equal(buf.Bytes(), expected) { - t.Errorf("got %v, want %v", buf.Bytes(), expected) + s.mu.Lock() + if len(s.speeds) != 100 { + t.Errorf("expected 100 samples, got %d", len(s.speeds)) + } + // First sample should be 5 (0-4 were dropped) + if s.speeds[0] != 5 { + t.Errorf("expected first sample to be 5, got %f", s.speeds[0]) + } + s.mu.Unlock() +} + +func TestSpeedTracker_Concurrent(t *testing.T) { + s := &speedTracker{} + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(v int) { + defer wg.Done() + s.Record(float64(v)) + s.Median() // concurrent read + }(i) + } + wg.Wait() + + // Should not panic, and should have reasonable state + s.mu.Lock() + if len(s.speeds) == 0 || len(s.speeds) > 100 { + t.Errorf("unexpected speeds length: %d", len(s.speeds)) + } + s.mu.Unlock() +} + +func TestStreamHasher_Sequential(t *testing.T) { + // Create temp file + f, err := os.CreateTemp("", "streamhasher_test") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + defer f.Close() + + // Write test data + data := []byte("hello world, this is a test of the stream hasher") + if _, err := f.Write(data); err != nil { + t.Fatal(err) + } + + // Create parts + parts := []*blobDownloadPart{ + {Offset: 0, Size: int64(len(data))}, + } + + sh := newStreamHasher(f, parts, int64(len(data))) + + // Mark complete and run + sh.MarkComplete(0) + + done := make(chan struct{}) + go func() { + sh.Run() + close(done) + }() + <-done + + // Verify digest + expected := fmt.Sprintf("sha256:%x", sha256.Sum256(data)) + if got := sh.Digest(); got != expected { + t.Errorf("digest mismatch: got %s, want %s", got, expected) + } + + if err := sh.Err(); err != nil { + t.Errorf("unexpected error: %v", err) } } -func TestOrderedWriter_Digest(t *testing.T) { - var buf bytes.Buffer - hasher := sha256.New() - ow := newOrderedWriter(&buf, hasher) +func TestStreamHasher_OutOfOrderCompletion(t *testing.T) { + // Create temp file + f, err := os.CreateTemp("", "streamhasher_test") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + defer f.Close() - // Submit some data - data := []byte("hello world") - if err := ow.Submit(0, data); err != nil { - t.Fatalf("Submit failed: %v", err) + // Write test data (3 parts of 10 bytes each) + data := []byte("0123456789ABCDEFGHIJabcdefghij") + if _, err := f.Write(data); err != nil { + t.Fatal(err) } - // Verify digest format and correctness - got := ow.Digest() - if len(got) != 71 { // "sha256:" + 64 hex chars - t.Errorf("digest has wrong length: %d, got: %s", len(got), got) - } - if got[:7] != "sha256:" { - t.Errorf("digest doesn't start with sha256: %s", got) + // Create 3 parts + parts := []*blobDownloadPart{ + {N: 0, Offset: 0, Size: 10}, + {N: 1, Offset: 10, Size: 10}, + {N: 2, Offset: 20, Size: 10}, } - // Verify it matches expected hash - expectedHash := sha256.Sum256(data) - want := "sha256:" + fmt.Sprintf("%x", expectedHash[:]) - if got != want { - t.Errorf("digest mismatch: got %s, want %s", got, want) + sh := newStreamHasher(f, parts, int64(len(data))) + + done := make(chan struct{}) + go func() { + sh.Run() + close(done) + }() + + // Mark parts complete out of order: 2, 0, 1 + sh.MarkComplete(2) + sh.MarkComplete(0) // This should trigger hashing of part 0 + sh.MarkComplete(1) // This should trigger hashing of parts 1 and 2 + + <-done + + // Verify digest + expected := fmt.Sprintf("sha256:%x", sha256.Sum256(data)) + if got := sh.Digest(); got != expected { + t.Errorf("digest mismatch: got %s, want %s", got, expected) } } -func BenchmarkOrderedWriter_InOrder(b *testing.B) { - // Benchmark throughput when parts arrive in order (best case) - partSize := 64 * 1024 * 1024 // 64MB parts - numParts := 4 - data := make([]byte, partSize) +func TestStreamHasher_Stop(t *testing.T) { + // Create temp file + f, err := os.CreateTemp("", "streamhasher_test") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + defer f.Close() + + parts := []*blobDownloadPart{ + {Offset: 0, Size: 100}, + } + + sh := newStreamHasher(f, parts, 100) + + done := make(chan struct{}) + go func() { + sh.Run() + close(done) + }() + + // Stop without completing any parts + sh.Stop() + <-done + + // Should exit cleanly without error + if err := sh.Err(); err != nil { + t.Errorf("unexpected error after Stop: %v", err) + } +} + +func TestStreamHasher_HashedProgress(t *testing.T) { + // Create temp file with known data + f, err := os.CreateTemp("", "streamhasher_test") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + defer f.Close() + + data := make([]byte, 1000) rand.Read(data) + if _, err := f.Write(data); err != nil { + t.Fatal(err) + } - b.SetBytes(int64(partSize * numParts)) + parts := []*blobDownloadPart{ + {N: 0, Offset: 0, Size: 500}, + {N: 1, Offset: 500, Size: 500}, + } + + sh := newStreamHasher(f, parts, 1000) + + // Initially no progress + if got := sh.Hashed(); got != 0 { + t.Errorf("expected 0 hashed initially, got %d", got) + } + + done := make(chan struct{}) + go func() { + sh.Run() + close(done) + }() + + // Complete part 0 + sh.MarkComplete(0) + + // Give hasher time to process + for i := 0; i < 100; i++ { + if sh.Hashed() >= 500 { + break + } + } + + // Complete part 1 + sh.MarkComplete(1) + <-done + + if got := sh.Hashed(); got != 1000 { + t.Errorf("expected 1000 hashed, got %d", got) + } +} + +func BenchmarkSpeedTracker_Record(b *testing.B) { + s := &speedTracker{} + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Record(float64(i)) + } +} + +func BenchmarkSpeedTracker_Median(b *testing.B) { + s := &speedTracker{} + // Pre-populate with 100 samples + for i := 0; i < 100; i++ { + s.Record(float64(i)) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.Median() + } +} + +func BenchmarkStreamHasher(b *testing.B) { + // Create temp file with test data + f, err := os.CreateTemp("", "streamhasher_bench") + if err != nil { + b.Fatal(err) + } + defer os.Remove(f.Name()) + defer f.Close() + + size := 64 * 1024 * 1024 // 64MB + data := make([]byte, size) + rand.Read(data) + if _, err := f.Write(data); err != nil { + b.Fatal(err) + } + + parts := []*blobDownloadPart{ + {Offset: 0, Size: int64(size)}, + } + + b.SetBytes(int64(size)) b.ResetTimer() for i := 0; i < b.N; i++ { - ow := newOrderedWriter(io.Discard, sha256.New()) - for p := 0; p < numParts; p++ { - if err := ow.Submit(p, data); err != nil { - b.Fatal(err) - } - } - } -} + sh := newStreamHasher(f, parts, int64(size)) + sh.MarkComplete(0) -func BenchmarkOrderedWriter_OutOfOrder(b *testing.B) { - // Benchmark throughput when parts arrive out of order (worst case) - partSize := 64 * 1024 * 1024 // 64MB parts - numParts := 4 - data := make([]byte, partSize) - rand.Read(data) - - // Reverse order: 3, 2, 1, 0 - order := make([]int, numParts) - for i := 0; i < numParts; i++ { - order[i] = numParts - 1 - i - } - - b.SetBytes(int64(partSize * numParts)) - b.ResetTimer() - - for i := 0; i < b.N; i++ { - ow := newOrderedWriter(io.Discard, sha256.New()) - for _, p := range order { - if err := ow.Submit(p, data); err != nil { - b.Fatal(err) - } - } + done := make(chan struct{}) + go func() { + sh.Run() + close(done) + }() + <-done } } @@ -139,28 +317,3 @@ func BenchmarkHashThroughput(b *testing.B) { h.Sum(nil) } } - -func BenchmarkOrderedWriter_Memory(b *testing.B) { - // Measure memory when buffering out-of-order parts - partSize := 64 * 1024 * 1024 // 64MB parts - numParts := 4 - data := make([]byte, partSize) - rand.Read(data) - - b.ReportAllocs() - b.ResetTimer() - - for i := 0; i < b.N; i++ { - ow := newOrderedWriter(io.Discard, sha256.New()) - // Submit all except part 0 (forces buffering) - for p := 1; p < numParts; p++ { - if err := ow.Submit(p, data); err != nil { - b.Fatal(err) - } - } - // Submit part 0 to flush - if err := ow.Submit(0, data); err != nil { - b.Fatal(err) - } - } -}