make our implementation of oci content store public, remove redundant wrapper Store methods in favor of OCI implementation, add tests for store.Copy*()

This commit is contained in:
Josh Wolf
2021-12-09 11:09:09 -07:00
parent 61cbc6f614
commit cd93d7aaea
4 changed files with 152 additions and 56 deletions

7
pkg/store/errors.go Normal file
View File

@@ -0,0 +1,7 @@
package store
import "github.com/pkg/errors"
var (
ErrInvalidReference = errors.New("invalid reference")
)

View File

@@ -20,22 +20,23 @@ import (
"github.com/rancherfederal/hauler/pkg/consts"
)
var _ target.Target = (*oci)(nil)
var _ target.Target = (*OCI)(nil)
type oci struct {
type OCI struct {
root string
index *ocispec.Index
nameMap *sync.Map // map[string]ocispec.Descriptor
}
func NewOCI(root string) (*oci, error) {
return &oci{
func NewOCI(root string) (*OCI, error) {
o := &OCI{
root: root,
nameMap: &sync.Map{},
}, nil
}
return o, nil
}
func (o *oci) LoadIndex() error {
func (o *OCI) LoadIndex() error {
path := o.path(consts.OCIImageIndexFile)
idx, err := os.Open(path)
if err != nil {
@@ -63,7 +64,7 @@ func (o *oci) LoadIndex() error {
return nil
}
func (o *oci) SaveIndex() error {
func (o *OCI) SaveIndex() error {
var descs []ocispec.Descriptor
o.nameMap.Range(func(name, desc interface{}) bool {
n := name.(string)
@@ -95,7 +96,7 @@ func (o *oci) SaveIndex() error {
// While the name may differ from ref, it should itself be a valid ref.
//
// If the resolution fails, an error will be returned.
func (o *oci) Resolve(ctx context.Context, ref string) (name string, desc ocispec.Descriptor, err error) {
func (o *OCI) Resolve(ctx context.Context, ref string) (name string, desc ocispec.Descriptor, err error) {
if err := o.LoadIndex(); err != nil {
return "", ocispec.Descriptor{}, err
}
@@ -110,7 +111,7 @@ func (o *oci) Resolve(ctx context.Context, ref string) (name string, desc ocispe
// Fetcher returns a new fetcher for the provided reference.
// All content fetched from the returned fetcher will be
// from the namespace referred to by ref.
func (o *oci) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) {
func (o *OCI) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) {
if err := o.LoadIndex(); err != nil {
return nil, err
}
@@ -120,7 +121,7 @@ func (o *oci) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error)
return o, nil
}
func (o *oci) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
func (o *OCI) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
readerAt, err := o.blobReaderAt(desc)
if err != nil {
return nil, err
@@ -131,7 +132,7 @@ func (o *oci) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser
// Pusher returns a new pusher for the provided reference
// The returned Pusher should satisfy content.Ingester and concurrent attempts
// to push the same blob using the Ingester API should result in ErrUnavailable.
func (o *oci) Pusher(ctx context.Context, ref string) (remotes.Pusher, error) {
func (o *OCI) Pusher(ctx context.Context, ref string) (remotes.Pusher, error) {
if err := o.LoadIndex(); err != nil {
return nil, err
}
@@ -149,7 +150,21 @@ func (o *oci) Pusher(ctx context.Context, ref string) (remotes.Pusher, error) {
}, nil
}
func (o *oci) blobReaderAt(desc ocispec.Descriptor) (*os.File, error) {
func (o *OCI) Walk(fn func(reference string, desc ocispec.Descriptor) error) error {
if err := o.LoadIndex(); err != nil {
return err
}
o.nameMap.Range(func(key, value interface{}) bool {
if err := fn(key.(string), value.(ocispec.Descriptor)); err != nil {
return false
}
return true
})
return nil
}
func (o *OCI) blobReaderAt(desc ocispec.Descriptor) (*os.File, error) {
blobPath, err := o.ensureBlob(desc)
if err != nil {
return nil, err
@@ -157,7 +172,7 @@ func (o *oci) blobReaderAt(desc ocispec.Descriptor) (*os.File, error) {
return os.Open(blobPath)
}
func (o *oci) blobWriterAt(desc ocispec.Descriptor) (*os.File, error) {
func (o *OCI) blobWriterAt(desc ocispec.Descriptor) (*os.File, error) {
blobPath, err := o.ensureBlob(desc)
if err != nil {
return nil, err
@@ -165,7 +180,7 @@ func (o *oci) blobWriterAt(desc ocispec.Descriptor) (*os.File, error) {
return os.OpenFile(blobPath, os.O_WRONLY|os.O_CREATE, 0644)
}
func (o *oci) ensureBlob(desc ocispec.Descriptor) (string, error) {
func (o *OCI) ensureBlob(desc ocispec.Descriptor) (string, error) {
dir := o.path("blobs", desc.Digest.Algorithm().String())
if err := os.MkdirAll(dir, os.ModePerm); err != nil && !os.IsExist(err) {
return "", err
@@ -173,13 +188,13 @@ func (o *oci) ensureBlob(desc ocispec.Descriptor) (string, error) {
return filepath.Join(dir, desc.Digest.Hex()), nil
}
func (o *oci) path(elem ...string) string {
func (o *OCI) path(elem ...string) string {
complete := []string{string(o.root)}
return filepath.Join(append(complete, elem...)...)
}
type ociPusher struct {
oci *oci
oci *OCI
ref string
digest string
}

View File

@@ -3,15 +3,12 @@ package store
import (
"context"
"encoding/json"
"io"
"io/ioutil"
"fmt"
"os"
"path/filepath"
"github.com/google/go-containerregistry/pkg/name"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"oras.land/oras-go/pkg/content"
"oras.land/oras-go/pkg/oras"
"oras.land/oras-go/pkg/target"
@@ -23,23 +20,19 @@ import (
type Store struct {
Root string
store *content.OCI
Store *OCI
cache cache.Cache
}
var (
ErrInvalidReference = errors.New("invalid reference")
)
func NewStore(rootdir string, opts ...Options) (*Store, error) {
ociStore, err := content.NewOCI(rootdir)
ociStore, err := NewOCI(rootdir)
if err != nil {
return nil, err
}
b := &Store{
Root: rootdir,
store: ociStore,
Store: ociStore,
}
for _, opt := range opts {
@@ -67,11 +60,11 @@ func (s *Store) AddArtifact(ctx context.Context, oci artifact.OCI, reference str
// Ensure that index.docker.io isn't prepended
ref, err := name.ParseReference(reference, name.WithDefaultRegistry(""), name.WithDefaultTag("latest"))
if err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "adding artifact")
return ocispec.Descriptor{}, fmt.Errorf("%w", ErrInvalidReference)
}
if err := stage.add(ctx, oci, ref); err != nil {
return ocispec.Descriptor{}, err
return ocispec.Descriptor{}, fmt.Errorf("adding artifact: %w", err)
}
return stage.commit(ctx, s)
}
@@ -117,56 +110,43 @@ func (s *Store) Flush(ctx context.Context) error {
return nil
}
func (s *Store) Open(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
readerAt, err := s.store.ReaderAt(ctx, desc)
if err != nil {
return nil, err
}
// just wrap the ReaderAt with a Reader
return ioutil.NopCloser(content.NewReaderAtWrapper(readerAt)), nil
}
func (s *Store) Walk(fn func(desc ocispec.Descriptor) error) error {
refs := s.store.ListReferences()
for _, desc := range refs {
if err := fn(desc); err != nil {
return err
}
}
return nil
}
// Copy will copy a given reference to a given target.Target
// This is essentially a wrapper around oras.Copy, but locked to this content store
func (s *Store) Copy(ctx context.Context, ref string, to target.Target, toRef string) (ocispec.Descriptor, error) {
return oras.Copy(ctx, s.store, ref, to, toRef,
return oras.Copy(ctx, s.Store, ref, to, toRef,
oras.WithAdditionalCachedMediaTypes(consts.DockerManifestSchema2))
}
// CopyAll performs bulk copy operations on the stores oci layout to a provided target.Target
func (s *Store) CopyAll(ctx context.Context, to target.Target, toMapper func(string) (string, error)) error {
for ref := range s.store.ListReferences() {
func (s *Store) CopyAll(ctx context.Context, to target.Target, toMapper func(string) (string, error)) ([]ocispec.Descriptor, error) {
var descs []ocispec.Descriptor
err := s.Store.Walk(func(reference string, desc ocispec.Descriptor) error {
toRef := ""
if toMapper != nil {
tr, err := toMapper(ref)
tr, err := toMapper(reference)
if err != nil {
return err
}
toRef = tr
}
_, err := s.Copy(ctx, ref, to, toRef)
desc, err := s.Copy(ctx, reference, to, toRef)
if err != nil {
return err
}
descs = append(descs, desc)
return nil
})
if err != nil {
return nil, err
}
return nil
return descs, nil
}
// Identify is a helper function that will identify a human-readable content type given a descriptor
func (s *Store) Identify(ctx context.Context, desc ocispec.Descriptor) string {
rc, err := s.store.Fetch(ctx, desc)
rc, err := s.Store.Fetch(ctx, desc)
if err != nil {
return ""
}

View File

@@ -5,14 +5,22 @@ import (
"context"
"encoding/json"
"os"
"path/filepath"
"reflect"
"testing"
"github.com/google/go-containerregistry/pkg/name"
gv1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/partial"
"github.com/google/go-containerregistry/pkg/v1/random"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/pkg/content"
"oras.land/oras-go/pkg/target"
_ "github.com/distribution/distribution/v3/registry/storage/driver/filesystem"
_ "github.com/distribution/distribution/v3/registry/storage/driver/inmemory"
"github.com/rancherfederal/hauler/internal/server"
"github.com/rancherfederal/hauler/pkg/artifact"
"github.com/rancherfederal/hauler/pkg/store"
)
@@ -113,3 +121,89 @@ func genArtifact(t *testing.T, ref string) (artifact.OCI, ocispec.Descriptor) {
}
return &testImage{Image: img}, m
}
func TestStore_CopyAll(t *testing.T) {
ctx := context.Background()
tmpdir, err := os.MkdirTemp("", "hauler")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
tmpdirRegistry := filepath.Join(tmpdir, "registry")
if err := os.Mkdir(tmpdirRegistry, os.ModePerm); err != nil {
t.Fatal()
}
tmpdirStore := filepath.Join(tmpdir, "store")
if err := os.Mkdir(tmpdirStore, os.ModePerm); err != nil {
t.Fatal()
}
r := server.NewTempRegistry(ctx, tmpdirRegistry)
if err := r.Start(); err != nil {
t.Error(err)
}
defer r.Stop()
rc, err := content.NewRegistry(content.RegistryOptions{Insecure: true})
if err != nil {
t.Fatal(err)
}
type args struct {
ctx context.Context
to target.Target
toMapper func(string) (string, error)
refs []string
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "should copy an artifact to a registry",
args: args{
ctx: ctx,
to: rc,
toMapper: nil,
refs: []string{"tester:tester"},
},
wantErr: false,
},
{
name: "should copy a lot of artifacts to a registry (test concurrency)",
args: args{
ctx: ctx,
to: rc,
toMapper: nil,
refs: []string{"a/b:c", "a/c:b", "b/c:d", "z/y:w", "u/q:a", "y/y:y", "z/z:z", "b/b:b", "c/c:c"},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s, err := store.NewStore(tmpdirStore)
if err != nil {
t.Fatal(err)
}
for _, ref := range tt.args.refs {
locRef, _ := name.ParseReference(ref, name.WithDefaultRegistry(r.Registry()))
a, _ := genArtifact(t, locRef.Name())
if _, err := s.AddArtifact(ctx, a, locRef.Name()); err != nil {
t.Errorf("Failed to generate store contents for CopyAll(): %v", err)
}
}
if descs, err := s.CopyAll(tt.args.ctx, tt.args.to, tt.args.toMapper); (err != nil) != tt.wantErr {
t.Errorf("CopyAll() error = %v, wantErr %v", err, tt.wantErr)
} else if len(descs) != len(tt.args.refs) {
t.Errorf("CopyAll() expected to push %d descriptors, but only pushed %d", len(descs), len(tt.args.refs))
}
})
}
}