This commit is contained in:
jmorganca 2025-12-20 19:23:09 -08:00
parent f90d968b8b
commit 12ff2d1461
1 changed files with 266 additions and 113 deletions

View File

@ -1,126 +1,304 @@
package server
import (
"bytes"
"crypto/rand"
"crypto/sha256"
"fmt"
"io"
"os"
"sync"
"testing"
)
func TestOrderedWriter_InOrder(t *testing.T) {
var buf bytes.Buffer
hasher := sha256.New()
ow := newOrderedWriter(&buf, hasher)
func TestSpeedTracker_Median(t *testing.T) {
s := &speedTracker{}
// Submit parts in order
for i := 0; i < 5; i++ {
data := []byte{byte(i), byte(i), byte(i)}
if err := ow.Submit(i, data); err != nil {
t.Fatalf("Submit(%d) failed: %v", i, err)
}
// Less than 3 samples returns 0
s.Record(100)
s.Record(200)
if got := s.Median(); got != 0 {
t.Errorf("expected 0 with < 3 samples, got %f", got)
}
// Verify output
expected := []byte{0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4}
if !bytes.Equal(buf.Bytes(), expected) {
t.Errorf("got %v, want %v", buf.Bytes(), expected)
// With 3+ samples, returns median
s.Record(300)
// Samples: [100, 200, 300] -> median = 200
if got := s.Median(); got != 200 {
t.Errorf("expected median 200, got %f", got)
}
// Add more samples
s.Record(50)
s.Record(250)
// Samples: [100, 200, 300, 50, 250] sorted = [50, 100, 200, 250, 300] -> median = 200
if got := s.Median(); got != 200 {
t.Errorf("expected median 200, got %f", got)
}
}
func TestOrderedWriter_OutOfOrder(t *testing.T) {
var buf bytes.Buffer
hasher := sha256.New()
ow := newOrderedWriter(&buf, hasher)
func TestSpeedTracker_RollingWindow(t *testing.T) {
s := &speedTracker{}
// Submit parts out of order: 2, 4, 1, 0, 3
order := []int{2, 4, 1, 0, 3}
for _, i := range order {
data := []byte{byte(i), byte(i), byte(i)}
if err := ow.Submit(i, data); err != nil {
t.Fatalf("Submit(%d) failed: %v", i, err)
}
// Add 105 samples (should keep only last 100)
for i := 0; i < 105; i++ {
s.Record(float64(i))
}
// Verify output is still in correct order
expected := []byte{0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4}
if !bytes.Equal(buf.Bytes(), expected) {
t.Errorf("got %v, want %v", buf.Bytes(), expected)
s.mu.Lock()
if len(s.speeds) != 100 {
t.Errorf("expected 100 samples, got %d", len(s.speeds))
}
// First sample should be 5 (0-4 were dropped)
if s.speeds[0] != 5 {
t.Errorf("expected first sample to be 5, got %f", s.speeds[0])
}
s.mu.Unlock()
}
func TestSpeedTracker_Concurrent(t *testing.T) {
s := &speedTracker{}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(v int) {
defer wg.Done()
s.Record(float64(v))
s.Median() // concurrent read
}(i)
}
wg.Wait()
// Should not panic, and should have reasonable state
s.mu.Lock()
if len(s.speeds) == 0 || len(s.speeds) > 100 {
t.Errorf("unexpected speeds length: %d", len(s.speeds))
}
s.mu.Unlock()
}
func TestStreamHasher_Sequential(t *testing.T) {
// Create temp file
f, err := os.CreateTemp("", "streamhasher_test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
defer f.Close()
// Write test data
data := []byte("hello world, this is a test of the stream hasher")
if _, err := f.Write(data); err != nil {
t.Fatal(err)
}
// Create parts
parts := []*blobDownloadPart{
{Offset: 0, Size: int64(len(data))},
}
sh := newStreamHasher(f, parts, int64(len(data)))
// Mark complete and run
sh.MarkComplete(0)
done := make(chan struct{})
go func() {
sh.Run()
close(done)
}()
<-done
// Verify digest
expected := fmt.Sprintf("sha256:%x", sha256.Sum256(data))
if got := sh.Digest(); got != expected {
t.Errorf("digest mismatch: got %s, want %s", got, expected)
}
if err := sh.Err(); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
func TestOrderedWriter_Digest(t *testing.T) {
var buf bytes.Buffer
hasher := sha256.New()
ow := newOrderedWriter(&buf, hasher)
func TestStreamHasher_OutOfOrderCompletion(t *testing.T) {
// Create temp file
f, err := os.CreateTemp("", "streamhasher_test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
defer f.Close()
// Submit some data
data := []byte("hello world")
if err := ow.Submit(0, data); err != nil {
t.Fatalf("Submit failed: %v", err)
// Write test data (3 parts of 10 bytes each)
data := []byte("0123456789ABCDEFGHIJabcdefghij")
if _, err := f.Write(data); err != nil {
t.Fatal(err)
}
// Verify digest format and correctness
got := ow.Digest()
if len(got) != 71 { // "sha256:" + 64 hex chars
t.Errorf("digest has wrong length: %d, got: %s", len(got), got)
}
if got[:7] != "sha256:" {
t.Errorf("digest doesn't start with sha256: %s", got)
// Create 3 parts
parts := []*blobDownloadPart{
{N: 0, Offset: 0, Size: 10},
{N: 1, Offset: 10, Size: 10},
{N: 2, Offset: 20, Size: 10},
}
// Verify it matches expected hash
expectedHash := sha256.Sum256(data)
want := "sha256:" + fmt.Sprintf("%x", expectedHash[:])
if got != want {
t.Errorf("digest mismatch: got %s, want %s", got, want)
sh := newStreamHasher(f, parts, int64(len(data)))
done := make(chan struct{})
go func() {
sh.Run()
close(done)
}()
// Mark parts complete out of order: 2, 0, 1
sh.MarkComplete(2)
sh.MarkComplete(0) // This should trigger hashing of part 0
sh.MarkComplete(1) // This should trigger hashing of parts 1 and 2
<-done
// Verify digest
expected := fmt.Sprintf("sha256:%x", sha256.Sum256(data))
if got := sh.Digest(); got != expected {
t.Errorf("digest mismatch: got %s, want %s", got, expected)
}
}
func BenchmarkOrderedWriter_InOrder(b *testing.B) {
// Benchmark throughput when parts arrive in order (best case)
partSize := 64 * 1024 * 1024 // 64MB parts
numParts := 4
data := make([]byte, partSize)
func TestStreamHasher_Stop(t *testing.T) {
// Create temp file
f, err := os.CreateTemp("", "streamhasher_test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
defer f.Close()
parts := []*blobDownloadPart{
{Offset: 0, Size: 100},
}
sh := newStreamHasher(f, parts, 100)
done := make(chan struct{})
go func() {
sh.Run()
close(done)
}()
// Stop without completing any parts
sh.Stop()
<-done
// Should exit cleanly without error
if err := sh.Err(); err != nil {
t.Errorf("unexpected error after Stop: %v", err)
}
}
func TestStreamHasher_HashedProgress(t *testing.T) {
// Create temp file with known data
f, err := os.CreateTemp("", "streamhasher_test")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
defer f.Close()
data := make([]byte, 1000)
rand.Read(data)
if _, err := f.Write(data); err != nil {
t.Fatal(err)
}
b.SetBytes(int64(partSize * numParts))
parts := []*blobDownloadPart{
{N: 0, Offset: 0, Size: 500},
{N: 1, Offset: 500, Size: 500},
}
sh := newStreamHasher(f, parts, 1000)
// Initially no progress
if got := sh.Hashed(); got != 0 {
t.Errorf("expected 0 hashed initially, got %d", got)
}
done := make(chan struct{})
go func() {
sh.Run()
close(done)
}()
// Complete part 0
sh.MarkComplete(0)
// Give hasher time to process
for i := 0; i < 100; i++ {
if sh.Hashed() >= 500 {
break
}
}
// Complete part 1
sh.MarkComplete(1)
<-done
if got := sh.Hashed(); got != 1000 {
t.Errorf("expected 1000 hashed, got %d", got)
}
}
func BenchmarkSpeedTracker_Record(b *testing.B) {
s := &speedTracker{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Record(float64(i))
}
}
func BenchmarkSpeedTracker_Median(b *testing.B) {
s := &speedTracker{}
// Pre-populate with 100 samples
for i := 0; i < 100; i++ {
s.Record(float64(i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.Median()
}
}
func BenchmarkStreamHasher(b *testing.B) {
// Create temp file with test data
f, err := os.CreateTemp("", "streamhasher_bench")
if err != nil {
b.Fatal(err)
}
defer os.Remove(f.Name())
defer f.Close()
size := 64 * 1024 * 1024 // 64MB
data := make([]byte, size)
rand.Read(data)
if _, err := f.Write(data); err != nil {
b.Fatal(err)
}
parts := []*blobDownloadPart{
{Offset: 0, Size: int64(size)},
}
b.SetBytes(int64(size))
b.ResetTimer()
for i := 0; i < b.N; i++ {
ow := newOrderedWriter(io.Discard, sha256.New())
for p := 0; p < numParts; p++ {
if err := ow.Submit(p, data); err != nil {
b.Fatal(err)
}
}
}
}
sh := newStreamHasher(f, parts, int64(size))
sh.MarkComplete(0)
func BenchmarkOrderedWriter_OutOfOrder(b *testing.B) {
// Benchmark throughput when parts arrive out of order (worst case)
partSize := 64 * 1024 * 1024 // 64MB parts
numParts := 4
data := make([]byte, partSize)
rand.Read(data)
// Reverse order: 3, 2, 1, 0
order := make([]int, numParts)
for i := 0; i < numParts; i++ {
order[i] = numParts - 1 - i
}
b.SetBytes(int64(partSize * numParts))
b.ResetTimer()
for i := 0; i < b.N; i++ {
ow := newOrderedWriter(io.Discard, sha256.New())
for _, p := range order {
if err := ow.Submit(p, data); err != nil {
b.Fatal(err)
}
}
done := make(chan struct{})
go func() {
sh.Run()
close(done)
}()
<-done
}
}
@ -139,28 +317,3 @@ func BenchmarkHashThroughput(b *testing.B) {
h.Sum(nil)
}
}
func BenchmarkOrderedWriter_Memory(b *testing.B) {
// Measure memory when buffering out-of-order parts
partSize := 64 * 1024 * 1024 // 64MB parts
numParts := 4
data := make([]byte, partSize)
rand.Read(data)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
ow := newOrderedWriter(io.Discard, sha256.New())
// Submit all except part 0 (forces buffering)
for p := 1; p < numParts; p++ {
if err := ow.Submit(p, data); err != nil {
b.Fatal(err)
}
}
// Submit part 0 to flush
if err := ow.Submit(0, data); err != nil {
b.Fatal(err)
}
}
}