From 641b9db8fda161e84144ca7c662c83f29260e7d4 Mon Sep 17 00:00:00 2001 From: Camryn Carter Date: Wed, 25 Mar 2026 07:59:51 -0700 Subject: [PATCH] chunk the haul (#519) * chunk the haul * validate numeric suffix on join * enforce valid chunk size * containerd warning * updated test.go files --- .github/workflows/tests.yaml | 17 +- cmd/hauler/cli/store/load.go | 35 +++- cmd/hauler/cli/store/save.go | 59 ++++++- cmd/hauler/cli/store/save_test.go | 96 +++++++++++ internal/flags/save.go | 2 + pkg/archives/archiver.go | 84 +++++++++ pkg/archives/archives_test.go | 277 ++++++++++++++++++++++++++++++ pkg/archives/unarchiver.go | 86 ++++++++++ 8 files changed, 652 insertions(+), 4 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 6e659c8..2f09d55 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -250,6 +250,13 @@ jobs: hauler store save --filename store.tar.zst # verify via save with filename and platform (amd64) hauler store save --filename store-amd64.tar.zst --platform linux/amd64 + # verify via save with chunk-size (splits into haul-chunked_0.tar.zst, haul-chunked_1.tar.zst, ...) + hauler store save --filename haul-chunked.tar.zst --chunk-size 50M + # verify chunk files exist and original is removed + ls haul-chunked_*.tar.zst + ! test -f haul-chunked.tar.zst + # verify at least two chunks were produced + [ $(ls haul-chunked_*.tar.zst | wc -l) -ge 2 ] - name: Remove Hauler Store Contents run: | @@ -269,6 +276,14 @@ jobs: hauler store load --filename store.tar.zst --tempdir /opt # verify via load with filename and platform (amd64) hauler store load --filename store-amd64.tar.zst + # verify via load from chunks using explicit first chunk + rm -rf store + hauler store load --filename haul-chunked_0.tar.zst + hauler store info + # verify via load from chunks using base filename (auto-detect) + rm -rf store + hauler store load --filename haul-chunked.tar.zst + hauler store info - name: Verify Hauler Store Contents run: | @@ -291,7 +306,7 @@ jobs: - name: Remove Hauler Store Contents run: | - rm -rf store haul.tar.zst store.tar.zst store-amd64.tar.zst + rm -rf store haul.tar.zst store.tar.zst store-amd64.tar.zst haul-chunked_*.tar.zst hauler store info - name: Verify - hauler store sync diff --git a/cmd/hauler/cli/store/load.go b/cmd/hauler/cli/store/load.go index f9973fa..62d1917 100644 --- a/cmd/hauler/cli/store/load.go +++ b/cmd/hauler/cli/store/load.go @@ -39,8 +39,9 @@ func LoadCmd(ctx context.Context, o *flags.LoadOpts, rso *flags.StoreRootOpts, r l.Debugf("using temporary directory at [%s]", tempDir) for _, fileName := range o.FileName { - l.Infof("loading haul [%s] to [%s]", fileName, o.StoreDir) - err := unarchiveLayoutTo(ctx, fileName, o.StoreDir, tempDir) + resolved := resolveHaulPath(fileName) + l.Infof("loading haul [%s] to [%s]", resolved, o.StoreDir) + err := unarchiveLayoutTo(ctx, resolved, o.StoreDir, tempDir) if err != nil { return err } @@ -85,6 +86,13 @@ func unarchiveLayoutTo(ctx context.Context, haulPath string, dest string, tempDi } } + // reassemble chunk files if haulPath matches the chunk naming pattern + joined, err := archives.JoinChunks(ctx, haulPath, tempDir) + if err != nil { + return err + } + haulPath = joined + if err := archives.Unarchive(ctx, haulPath, tempDir); err != nil { return err } @@ -142,6 +150,29 @@ func unarchiveLayoutTo(ctx context.Context, haulPath string, dest string, tempDi return err } +// resolveHaulPath returns path as-is if it exists or is a URL. If the file is +// not found, it globs for chunk files matching _* in the same +// directory and returns the first match so JoinChunks can reassemble them. +func resolveHaulPath(path string) string { + if strings.HasPrefix(path, "http://") || strings.HasPrefix(path, "https://") { + return path + } + if _, err := os.Stat(path); err == nil { + return path + } + base := path + ext := "" + for filepath.Ext(base) != "" { + ext = filepath.Ext(base) + ext + base = strings.TrimSuffix(base, filepath.Ext(base)) + } + matches, err := filepath.Glob(base + "_*" + ext) + if err != nil || len(matches) == 0 { + return path + } + return matches[0] +} + func clearDir(path string) error { entries, err := os.ReadDir(path) if err != nil { diff --git a/cmd/hauler/cli/store/save.go b/cmd/hauler/cli/store/save.go index 0feb014..9db2f39 100644 --- a/cmd/hauler/cli/store/save.go +++ b/cmd/hauler/cli/store/save.go @@ -4,10 +4,13 @@ import ( "bytes" "context" "encoding/json" + "fmt" "os" "path" "path/filepath" "slices" + "strconv" + "strings" referencev3 "github.com/distribution/distribution/v3/reference" "github.com/google/go-containerregistry/pkg/name" @@ -72,10 +75,64 @@ func SaveCmd(ctx context.Context, o *flags.SaveOpts, rso *flags.StoreRootOpts, r return err } - l.Infof("saving store [%s] to archive [%s]", o.StoreDir, o.FileName) + if o.ChunkSize != "" { + if o.ContainerdCompatibility == true { + l.Warnf("compatibility warning... stores split by chunk size must be imported using `hauler store load` to rejoin before import to containerd") + } + maxBytes, err := parseChunkSize(o.ChunkSize) + if err != nil { + return err + } + chunks, err := archives.SplitArchive(ctx, absOutputfile, maxBytes) + if err != nil { + return err + } + for _, c := range chunks { + l.Infof("saving store [%s] to chunk [%s]", o.StoreDir, filepath.Base(c)) + } + } else { + l.Infof("saving store [%s] to archive [%s]", o.StoreDir, o.FileName) + } + return nil } +// parseChunkSize parses a human-readable byte size string (e.g. "1G", "500M", "2GB") +// into a byte count. Suffixes are treated as binary units (1K = 1024). +func parseChunkSize(s string) (int64, error) { + units := map[string]int64{ + "K": 1 << 10, "KB": 1 << 10, + "M": 1 << 20, "MB": 1 << 20, + "G": 1 << 30, "GB": 1 << 30, + "T": 1 << 40, "TB": 1 << 40, + } + s = strings.ToUpper(strings.TrimSpace(s)) + var result int64 + matched := false + for suffix, mult := range units { + if strings.HasSuffix(s, suffix) { + n, err := strconv.ParseInt(strings.TrimSuffix(s, suffix), 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid chunk size %q", s) + } + result = n * mult + matched = true + break + } + } + if !matched { + n, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid chunk size %q: %w", s, err) + } + result = n + } + if result <= 0 { + return 0, fmt.Errorf("chunk size must be greater than zero, received %q", s) + } + return result, nil +} + type exports struct { digests []string records map[string]tarball.Descriptor diff --git a/cmd/hauler/cli/store/save_test.go b/cmd/hauler/cli/store/save_test.go index 4c73fd0..b1a58bc 100644 --- a/cmd/hauler/cli/store/save_test.go +++ b/cmd/hauler/cli/store/save_test.go @@ -221,3 +221,99 @@ func TestSaveCmd_EmptyStore(t *testing.T) { t.Fatalf("archive not created for empty store: %v", err) } } + +// -------------------------------------------------------------------------- +// parseChunkSize unit tests +// -------------------------------------------------------------------------- + +func TestParseChunkSize(t *testing.T) { + tests := []struct { + name string + input string + want int64 + wantErr bool + }{ + {name: "kilobytes", input: "1K", want: 1 << 10}, + {name: "kilobytes long", input: "1KB", want: 1 << 10}, + {name: "megabytes", input: "500M", want: 500 << 20}, + {name: "megabytes long", input: "500MB", want: 500 << 20}, + {name: "gigabytes", input: "2G", want: 2 << 30}, + {name: "gigabytes long", input: "2GB", want: 2 << 30}, + {name: "terabytes", input: "1T", want: 1 << 40}, + {name: "terabytes long", input: "1TB", want: 1 << 40}, + {name: "plain bytes", input: "1024", want: 1024}, + {name: "lowercase", input: "1g", want: 1 << 30}, + {name: "whitespace trimmed", input: " 1G ", want: 1 << 30}, + {name: "zero is invalid", input: "0", wantErr: true}, + {name: "zero with suffix", input: "0M", wantErr: true}, + {name: "negative bytes", input: "-1", wantErr: true}, + {name: "negative with suffix", input: "-1G", wantErr: true}, + {name: "empty string", input: "", wantErr: true}, + {name: "invalid suffix", input: "1X", wantErr: true}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseChunkSize(tt.input) + if (err != nil) != tt.wantErr { + t.Fatalf("parseChunkSize(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + } + if !tt.wantErr && got != tt.want { + t.Errorf("parseChunkSize(%q) = %d, want %d", tt.input, got, tt.want) + } + }) + } +} + +// -------------------------------------------------------------------------- +// SaveCmd chunk-size integration tests +// Do NOT use t.Parallel() — SaveCmd calls os.Chdir. +// -------------------------------------------------------------------------- + +func TestSaveCmd_ChunkSize(t *testing.T) { + ctx := newTestContext(t) + host, _ := newLocalhostRegistry(t) + seedImage(t, host, "test/chunksave", "v1") + + s := newTestStore(t) + if err := s.AddImage(ctx, host+"/test/chunksave:v1", ""); err != nil { + t.Fatalf("AddImage: %v", err) + } + + archiveDir := t.TempDir() + archivePath := filepath.Join(archiveDir, "haul-chunked.tar.zst") + o := newSaveOpts(s.Root, archivePath) + o.ChunkSize = "1K" + + if err := SaveCmd(ctx, o, defaultRootOpts(s.Root), defaultCliOpts()); err != nil { + t.Fatalf("SaveCmd with chunk-size: %v", err) + } + + // original archive must be replaced by chunk files + if _, err := os.Stat(archivePath); !os.IsNotExist(err) { + t.Error("original archive should be removed after chunking") + } + + // at least one chunk must exist + matches, err := filepath.Glob(filepath.Join(archiveDir, "haul-chunked_*.tar.zst")) + if err != nil { + t.Fatalf("glob chunks: %v", err) + } + if len(matches) == 0 { + t.Fatal("expected at least one chunk file, found none") + } +} + +func TestSaveCmd_ChunkSize_Invalid(t *testing.T) { + ctx := newTestContext(t) + s := newTestStore(t) + if err := s.SaveIndex(); err != nil { + t.Fatalf("SaveIndex: %v", err) + } + + o := newSaveOpts(s.Root, filepath.Join(t.TempDir(), "haul.tar.zst")) + o.ChunkSize = "0" + + if err := SaveCmd(ctx, o, defaultRootOpts(s.Root), defaultCliOpts()); err == nil { + t.Fatal("SaveCmd: expected error for chunk-size=0, got nil") + } +} diff --git a/internal/flags/save.go b/internal/flags/save.go index 83bd2a6..92ea88a 100644 --- a/internal/flags/save.go +++ b/internal/flags/save.go @@ -10,6 +10,7 @@ type SaveOpts struct { FileName string Platform string ContainerdCompatibility bool + ChunkSize string } func (o *SaveOpts) AddFlags(cmd *cobra.Command) { @@ -18,5 +19,6 @@ func (o *SaveOpts) AddFlags(cmd *cobra.Command) { f.StringVarP(&o.FileName, "filename", "f", consts.DefaultHaulerArchiveName, "(Optional) Specify the name of outputted haul") f.StringVarP(&o.Platform, "platform", "p", "", "(Optional) Specify the platform for runtime imports... i.e. linux/amd64 (unspecified implies all)") f.BoolVar(&o.ContainerdCompatibility, "containerd", false, "(Optional) Enable import compatibility with containerd... removes oci-layout from the haul") + f.StringVar(&o.ChunkSize, "chunk-size", "", "(Optional) Split the output archive into chunks of the specified size (e.g. 1G, 500M, 2048M)") } diff --git a/pkg/archives/archiver.go b/pkg/archives/archiver.go index bedbd8f..cb9320b 100644 --- a/pkg/archives/archiver.go +++ b/pkg/archives/archiver.go @@ -3,8 +3,10 @@ package archives import ( "context" "fmt" + "io" "os" "path/filepath" + "strings" "github.com/mholt/archives" "hauler.dev/go/hauler/pkg/log" @@ -102,3 +104,85 @@ func Archive(ctx context.Context, dir, outfile string, compression archives.Comp l.Debugf("archive created successfully [%s]", outfile) return nil } + +// SplitArchive splits an existing archive into chunks of at most maxBytes each. +// Chunks are named _0, _1, ... where base is the archive +// path with all extensions stripped, and ext is the compound extension (e.g. .tar.zst). +// The original archive is removed after successful splitting. +func SplitArchive(ctx context.Context, archivePath string, maxBytes int64) ([]string, error) { + l := log.FromContext(ctx) + + // derive base path and compound extension by stripping all extensions + base := archivePath + ext := "" + for filepath.Ext(base) != "" { + ext = filepath.Ext(base) + ext + base = strings.TrimSuffix(base, filepath.Ext(base)) + } + + f, err := os.Open(archivePath) + if err != nil { + return nil, fmt.Errorf("failed to open archive for splitting: %w", err) + } + + var chunks []string + buf := make([]byte, 32*1024) + chunkIdx := 0 + var written int64 + var outf *os.File + + for { + if outf == nil { + chunkPath := fmt.Sprintf("%s_%d%s", base, chunkIdx, ext) + outf, err = os.Create(chunkPath) + if err != nil { + f.Close() + return nil, fmt.Errorf("failed to create chunk %d: %w", chunkIdx, err) + } + chunks = append(chunks, chunkPath) + l.Debugf("creating chunk [%s]", chunkPath) + written = 0 + chunkIdx++ + } + + remaining := maxBytes - written + readSize := int64(len(buf)) + if readSize > remaining { + readSize = remaining + } + + n, readErr := f.Read(buf[:readSize]) + if n > 0 { + if _, writeErr := outf.Write(buf[:n]); writeErr != nil { + outf.Close() + f.Close() + return nil, fmt.Errorf("failed to write to chunk: %w", writeErr) + } + written += int64(n) + } + + if readErr == io.EOF { + outf.Close() + outf = nil + break + } + if readErr != nil { + outf.Close() + f.Close() + return nil, fmt.Errorf("failed to read archive: %w", readErr) + } + + if written >= maxBytes { + outf.Close() + outf = nil + } + } + + f.Close() + if err := os.Remove(archivePath); err != nil { + return nil, fmt.Errorf("failed to remove original archive after splitting: %w", err) + } + + l.Infof("split archive [%s] into %d chunk(s)", filepath.Base(archivePath), len(chunks)) + return chunks, nil +} diff --git a/pkg/archives/archives_test.go b/pkg/archives/archives_test.go index 5b70efa..00c7ce2 100644 --- a/pkg/archives/archives_test.go +++ b/pkg/archives/archives_test.go @@ -1,8 +1,10 @@ package archives import ( + "bytes" "context" "encoding/json" + "fmt" "io" "os" "path/filepath" @@ -162,3 +164,278 @@ func TestSecurePath(t *testing.T) { }) } } + +// -------------------------------------------------------------------------- +// chunkInfo +// -------------------------------------------------------------------------- + +func TestChunkInfo(t *testing.T) { + tests := []struct { + name string + path string + wantBase string + wantExt string + wantIndex int + wantOk bool + }{ + { + name: "compound extension", + path: "/tmp/haul_3.tar.zst", + wantBase: "/tmp/haul", + wantExt: ".tar.zst", + wantIndex: 3, + wantOk: true, + }, + { + name: "single extension", + path: "/tmp/archive_0.zst", + wantBase: "/tmp/archive", + wantExt: ".zst", + wantIndex: 0, + wantOk: true, + }, + { + name: "large index", + path: "/tmp/haul_42.tar.zst", + wantBase: "/tmp/haul", + wantExt: ".tar.zst", + wantIndex: 42, + wantOk: true, + }, + { + name: "no numeric suffix", + path: "/tmp/haul.tar.zst", + wantOk: false, + }, + { + name: "alphabetic suffix", + path: "/tmp/haul_abc.tar.zst", + wantOk: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + base, ext, index, ok := chunkInfo(tt.path) + if ok != tt.wantOk { + t.Fatalf("chunkInfo() ok = %v, want %v", ok, tt.wantOk) + } + if !ok { + return + } + if base != tt.wantBase { + t.Errorf("chunkInfo() base = %q, want %q", base, tt.wantBase) + } + if ext != tt.wantExt { + t.Errorf("chunkInfo() ext = %q, want %q", ext, tt.wantExt) + } + if index != tt.wantIndex { + t.Errorf("chunkInfo() index = %d, want %d", index, tt.wantIndex) + } + }) + } +} + +// -------------------------------------------------------------------------- +// SplitArchive +// -------------------------------------------------------------------------- + +func TestSplitArchive(t *testing.T) { + ctx := testContext(t) + + tests := []struct { + name string + dataSize int + maxBytes int64 + }{ + {name: "splits into multiple chunks", dataSize: 100, maxBytes: 30}, + {name: "single chunk when data fits", dataSize: 50, maxBytes: 100}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dir := t.TempDir() + archivePath := filepath.Join(dir, "haul.tar.zst") + data := make([]byte, tt.dataSize) + for i := range data { + data[i] = byte(i % 256) + } + if err := os.WriteFile(archivePath, data, 0o644); err != nil { + t.Fatal(err) + } + + chunks, err := SplitArchive(ctx, archivePath, tt.maxBytes) + if err != nil { + t.Fatalf("SplitArchive() error = %v", err) + } + if len(chunks) == 0 { + t.Fatal("SplitArchive() returned no chunks") + } + + // original archive must be removed + if _, err := os.Stat(archivePath); !os.IsNotExist(err) { + t.Error("original archive should be removed after splitting") + } + + // chunks must follow _N naming + for i, chunk := range chunks { + expected := filepath.Join(dir, fmt.Sprintf("haul_%d.tar.zst", i)) + if chunk != expected { + t.Errorf("chunk[%d] = %s, want %s", i, chunk, expected) + } + } + + // concatenating chunks must reproduce the original data + var combined []byte + for _, chunk := range chunks { + b, err := os.ReadFile(chunk) + if err != nil { + t.Fatal(err) + } + combined = append(combined, b...) + } + if !bytes.Equal(combined, data) { + t.Error("combined chunks do not match original data") + } + }) + } +} + +func TestSplitArchive_MissingFile(t *testing.T) { + ctx := testContext(t) + dir := t.TempDir() + _, err := SplitArchive(ctx, filepath.Join(dir, "nonexistent.tar.zst"), 1<<30) + if err == nil { + t.Fatal("SplitArchive() expected error for missing file, got nil") + } +} + +// -------------------------------------------------------------------------- +// JoinChunks +// -------------------------------------------------------------------------- + +func TestJoinChunks(t *testing.T) { + ctx := testContext(t) + + t.Run("joins multiple chunks in order", func(t *testing.T) { + dir := t.TempDir() + tempDir := t.TempDir() + for i, content := range []string{"chunk0-data", "chunk1-data", "chunk2-data"} { + if err := os.WriteFile(filepath.Join(dir, fmt.Sprintf("haul_%d.tar.zst", i)), []byte(content), 0o644); err != nil { + t.Fatal(err) + } + } + + got, err := JoinChunks(ctx, filepath.Join(dir, "haul_0.tar.zst"), tempDir) + if err != nil { + t.Fatalf("JoinChunks() error = %v", err) + } + data, err := os.ReadFile(got) + if err != nil { + t.Fatal(err) + } + if want := []byte("chunk0-datachunk1-datachunk2-data"); !bytes.Equal(data, want) { + t.Errorf("JoinChunks() content = %q, want %q", data, want) + } + }) + + t.Run("any chunk triggers full assembly", func(t *testing.T) { + dir := t.TempDir() + tempDir := t.TempDir() + for i, content := range []string{"aaa", "bbb"} { + if err := os.WriteFile(filepath.Join(dir, fmt.Sprintf("data_%d.tar.zst", i)), []byte(content), 0o644); err != nil { + t.Fatal(err) + } + } + + // pass chunk_1, not chunk_0 — should still assemble from chunk_0 + got, err := JoinChunks(ctx, filepath.Join(dir, "data_1.tar.zst"), tempDir) + if err != nil { + t.Fatalf("JoinChunks() error = %v", err) + } + data, err := os.ReadFile(got) + if err != nil { + t.Fatal(err) + } + if want := []byte("aaabbb"); !bytes.Equal(data, want) { + t.Errorf("JoinChunks() content = %q, want %q", data, want) + } + }) + + t.Run("non-chunk file returned unchanged", func(t *testing.T) { + dir := t.TempDir() + nonChunk := filepath.Join(dir, "haul.tar.zst") + if err := os.WriteFile(nonChunk, []byte("not-a-chunk"), 0o644); err != nil { + t.Fatal(err) + } + got, err := JoinChunks(ctx, nonChunk, t.TempDir()) + if err != nil { + t.Fatalf("JoinChunks() error = %v", err) + } + if got != nonChunk { + t.Errorf("JoinChunks() = %s, want %s (unchanged)", got, nonChunk) + } + }) + + t.Run("non-numeric suffix files excluded", func(t *testing.T) { + dir := t.TempDir() + tempDir := t.TempDir() + if err := os.WriteFile(filepath.Join(dir, "haul_0.tar.zst"), []byte("valid"), 0o644); err != nil { + t.Fatal(err) + } + // glob matches this but chunkInfo rejects it + if err := os.WriteFile(filepath.Join(dir, "haul_foo.tar.zst"), []byte("invalid"), 0o644); err != nil { + t.Fatal(err) + } + + got, err := JoinChunks(ctx, filepath.Join(dir, "haul_0.tar.zst"), tempDir) + if err != nil { + t.Fatalf("JoinChunks() error = %v", err) + } + data, err := os.ReadFile(got) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(data, []byte("valid")) { + t.Errorf("JoinChunks() included non-numeric suffix file; content = %q", data) + } + }) +} + +// -------------------------------------------------------------------------- +// SplitArchive + JoinChunks round-trip +// -------------------------------------------------------------------------- + +func TestSplitJoinChunks_RoundTrip(t *testing.T) { + ctx := testContext(t) + + original := make([]byte, 1000) + for i := range original { + original[i] = byte(i % 256) + } + + dir := t.TempDir() + archivePath := filepath.Join(dir, "haul.tar.zst") + if err := os.WriteFile(archivePath, original, 0o644); err != nil { + t.Fatal(err) + } + + chunks, err := SplitArchive(ctx, archivePath, 100) + if err != nil { + t.Fatalf("SplitArchive() error = %v", err) + } + if len(chunks) == 0 { + t.Fatal("SplitArchive() returned no chunks") + } + + joined, err := JoinChunks(ctx, chunks[0], t.TempDir()) + if err != nil { + t.Fatalf("JoinChunks() error = %v", err) + } + + got, err := os.ReadFile(joined) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, original) { + t.Error("round-trip: joined data does not match original") + } +} diff --git a/pkg/archives/unarchiver.go b/pkg/archives/unarchiver.go index 722f195..7678f26 100644 --- a/pkg/archives/unarchiver.go +++ b/pkg/archives/unarchiver.go @@ -6,6 +6,9 @@ import ( "io" "os" "path/filepath" + "regexp" + "sort" + "strconv" "strings" "github.com/mholt/archives" @@ -156,3 +159,86 @@ func Unarchive(ctx context.Context, tarball, dst string) error { l.Infof("unarchiving completed successfully") return nil } + +var chunkSuffixRe = regexp.MustCompile(`^(.+)_(\d+)$`) + +// chunkInfo checks whether archivePath matches the chunk naming pattern (_N). +// Returns the base path (without index), compound extension, numeric index, and whether it matched. +func chunkInfo(archivePath string) (base, ext string, index int, ok bool) { + dir := filepath.Dir(archivePath) + name := filepath.Base(archivePath) + + // strip compound extension (e.g. .tar.zst) + nameBase := name + nameExt := "" + for filepath.Ext(nameBase) != "" { + nameExt = filepath.Ext(nameBase) + nameExt + nameBase = strings.TrimSuffix(nameBase, filepath.Ext(nameBase)) + } + + m := chunkSuffixRe.FindStringSubmatch(nameBase) + if m == nil { + return "", "", 0, false + } + + idx, _ := strconv.Atoi(m[2]) + return filepath.Join(dir, m[1]), nameExt, idx, true +} + +// JoinChunks detects whether archivePath is a chunk file and, if so, finds all +// sibling chunks, concatenates them in numeric order into a single file in tempDir, +// and returns the path to the joined file. If archivePath is not a chunk, it is +// returned unchanged. +func JoinChunks(ctx context.Context, archivePath, tempDir string) (string, error) { + l := log.FromContext(ctx) + + base, ext, _, ok := chunkInfo(archivePath) + if !ok { + return archivePath, nil + } + + all, err := filepath.Glob(base + "_*" + ext) + if err != nil { + return archivePath, nil + } + var matches []string + for _, m := range all { + if _, _, _, ok := chunkInfo(m); ok { + matches = append(matches, m) + } + } + if len(matches) == 0 { + return archivePath, nil + } + + sort.Slice(matches, func(i, j int) bool { + _, _, idxI, _ := chunkInfo(matches[i]) + _, _, idxJ, _ := chunkInfo(matches[j]) + return idxI < idxJ + }) + + l.Debugf("joining %d chunk(s) for [%s]", len(matches), base) + + joinedPath := filepath.Join(tempDir, filepath.Base(base)+ext) + outf, err := os.Create(joinedPath) + if err != nil { + return "", fmt.Errorf("failed to create joined archive: %w", err) + } + defer outf.Close() + + for _, chunk := range matches { + l.Debugf("joining chunk [%s]", chunk) + cf, err := os.Open(chunk) + if err != nil { + return "", fmt.Errorf("failed to open chunk [%s]: %w", chunk, err) + } + if _, err := io.Copy(outf, cf); err != nil { + cf.Close() + return "", fmt.Errorf("failed to copy chunk [%s]: %w", chunk, err) + } + cf.Close() + } + + l.Infof("joined %d chunk(s) into [%s]", len(matches), filepath.Base(joinedPath)) + return joinedPath, nil +}