ollama/x/registry/server_test.go

254 lines
5.5 KiB
Go

package registry
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http/httptest"
"os"
"os/exec"
"strings"
"testing"
"time"
"bllamo.com/registry/apitype"
"bllamo.com/utils/backoff"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"kr.dev/diff"
)
const abc = "abcdefghijklmnopqrstuvwxyz"
func testPush(t *testing.T, chunkSize int64) {
t.Run(fmt.Sprintf("chunkSize=%d", chunkSize), func(t *testing.T) {
mc := startMinio(t, false)
manifest := []byte(`{
"layers": [
{"digest": "sha256-1", "size": 1},
{"digest": "sha256-2", "size": 2},
{"digest": "sha256-3", "size": 3}
]
}`)
const ref = "registry.ollama.ai/x/y:latest+Z"
hs := httptest.NewServer(&Server{
minioClient: mc,
UploadChunkSize: chunkSize,
})
t.Cleanup(hs.Close)
c := &Client{BaseURL: hs.URL}
requirements, err := c.Push(context.Background(), ref, manifest, nil)
if err != nil {
t.Fatal(err)
}
if len(requirements) < 3 {
t.Fatalf("expected at least 3 requirements; got %d", len(requirements))
t.Logf("requirements: %v", requirements)
}
var uploaded []apitype.CompletePart
for i, r := range requirements {
t.Logf("[%d] pushing layer: offset=%d size=%d", i, r.Offset, r.Size)
body := strings.NewReader(abc)
etag, err := PushLayer(context.Background(), r.URL, r.Offset, r.Size, body)
if err != nil {
t.Fatal(err)
}
uploaded = append(uploaded, apitype.CompletePart{
URL: r.URL,
ETag: etag,
})
}
requirements, err = c.Push(context.Background(), ref, manifest, &PushParams{
Uploaded: uploaded,
})
if err != nil {
t.Fatal(err)
}
if len(requirements) != 0 {
t.Fatalf("unexpected requirements: %v", requirements)
}
var paths []string
keys := mc.ListObjects(context.Background(), "test", minio.ListObjectsOptions{
Recursive: true,
})
for k := range keys {
paths = append(paths, k.Key)
}
t.Logf("paths: %v", paths)
diff.Test(t, t.Errorf, paths, []string{
"blobs/sha256-1",
"blobs/sha256-2",
"blobs/sha256-3",
"manifests/registry.ollama.ai/x/y/latest/Z",
})
obj, err := mc.GetObject(context.Background(), "test", "manifests/registry.ollama.ai/x/y/latest/Z", minio.GetObjectOptions{})
if err != nil {
t.Fatal(err)
}
defer obj.Close()
var gotM apitype.Manifest
if err := json.NewDecoder(obj).Decode(&gotM); err != nil {
t.Fatal(err)
}
diff.Test(t, t.Errorf, gotM, apitype.Manifest{
Layers: []apitype.Layer{
{Digest: "sha256-1", Size: 1},
{Digest: "sha256-2", Size: 2},
{Digest: "sha256-3", Size: 3},
},
})
// checksum the blobs
for i, l := range gotM.Layers {
obj, err := mc.GetObject(context.Background(), "test", "blobs/"+l.Digest, minio.GetObjectOptions{})
if err != nil {
t.Fatal(err)
}
defer obj.Close()
info, err := obj.Stat()
if err != nil {
t.Fatal(err)
}
t.Logf("[%d] layer info: name=%q l.Size=%d size=%d", i, info.Key, l.Size, info.Size)
data, err := io.ReadAll(obj)
if err != nil {
t.Fatal(err)
}
got := string(data)
want := abc[:l.Size]
if got != want {
t.Errorf("[%d] got layer data = %q; want %q", i, got, want)
}
}
})
}
func TestPush(t *testing.T) {
testPush(t, 0)
testPush(t, 1)
}
func availableAddr() string {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
panic(err)
}
defer l.Close()
return l.Addr().String()
}
func startMinio(t *testing.T, debug bool) *minio.Client {
t.Helper()
dir := t.TempDir()
t.Logf(">> minio data dir: %s", dir)
addr := availableAddr()
cmd := exec.Command("minio", "server", "--address", addr, dir)
cmd.Env = os.Environ()
if debug {
stdout, err := cmd.StdoutPipe()
if err != nil {
t.Fatal(err)
}
doneLogging := make(chan struct{})
t.Cleanup(func() {
<-doneLogging
})
go func() {
defer close(doneLogging)
sc := bufio.NewScanner(stdout)
for sc.Scan() {
t.Logf("minio: %s", sc.Text())
}
}()
}
// TODO(bmizerany): wait delay etc...
if err := cmd.Start(); err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
cmd.Process.Kill()
if err := cmd.Wait(); err != nil {
var e *exec.ExitError
if errors.As(err, &e) && e.Exited() {
t.Logf("minio stderr: %s", e.Stderr)
t.Logf("minio exit status: %v", e.ExitCode())
t.Logf("minio exited: %v", e.Exited())
t.Error(err)
}
}
})
mc, err := minio.New(addr, &minio.Options{
Creds: credentials.NewStaticV4("minioadmin", "minioadmin", ""),
Secure: false,
})
if err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
deadline, ok := t.Deadline()
if ok {
ctx, cancel = context.WithDeadline(ctx, deadline.Add(-100*time.Millisecond))
defer cancel()
}
// wait for server to start with exponential backoff
for _, err := range backoff.Upto(ctx, 1*time.Second) {
if err != nil {
t.Fatal(err)
}
if mc.IsOnline() {
break
}
}
if err := mc.MakeBucket(context.Background(), "test", minio.MakeBucketOptions{}); err != nil {
t.Fatal(err)
}
return mc
}
// contextForTest returns a context that is canceled when the test deadline,
// if any, is reached. The returned doneLogging function should be called
// after all Log/Error/Fatalf calls are done before the test returns.
func contextForTest(t *testing.T) (_ context.Context, doneLogging func()) {
done := make(chan struct{})
deadline, ok := t.Deadline()
if !ok {
return context.Background(), func() {}
}
ctx, cancel := context.WithDeadline(context.Background(), deadline.Add(-100*time.Millisecond))
t.Cleanup(func() {
cancel()
<-done
})
return ctx, func() { close(done) }
}