Skip to content

Commit

Permalink
trigger initial mirror on repo via repoPool
Browse files Browse the repository at this point in the history
  • Loading branch information
asiyani committed Mar 25, 2024
1 parent 5054614 commit a5109e7
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 67 deletions.
35 changes: 31 additions & 4 deletions pkg/mirror/repo_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"time"
)

var (
Expand All @@ -20,6 +21,7 @@ type RepoPool struct {

// NewRepoPool will create mirror repositories based on given config and start loop
// RepoPool provides simple wrapper around Repository methods and used remote url to select repository
// remote repo will not be mirrored until either Mirror() or StartLoop() is called
func NewRepoPool(conf RepoPoolConfig, log *slog.Logger, commonENVs []string) (*RepoPool, error) {
if err := conf.ValidateDefaults(); err != nil {
return nil, err
Expand Down Expand Up @@ -52,22 +54,47 @@ func NewRepoPool(conf RepoPoolConfig, log *slog.Logger, commonENVs []string) (*R
return rp, nil
}

// AddRepository will add given repository to repoPool and
// start mirror loop if its not already started
// AddRepository will add given repository to repoPool
// remote repo will not be mirrored until either Mirror() or StartLoop() is called
func (rp *RepoPool) AddRepository(repo *Repository) error {
if repo, _ := rp.Repo(repo.remote); repo != nil {
return ErrExist
}

rp.repos = append(rp.repos, repo)

if !repo.running {
go repo.StartLoop(context.TODO())
return nil
}

// Mirror will trigger mirror on every repo in foreground with given timeout
// it will error out if any of the repository mirror errors
// ideally Mirror should be used for the first mirror to ensure repositories are
// successfully mirrored
func (rp *RepoPool) Mirror(ctx context.Context, timeout time.Duration) error {
for _, repo := range rp.repos {
mCtx, cancel := context.WithTimeout(ctx, timeout)
err := repo.Mirror(mCtx)
cancel()
if err != nil {
return fmt.Errorf("repository mirror failed err:%w", err)
}
}

return nil
}

// StartLoop will start mirror loop if its not already started
func (rp *RepoPool) StartLoop() error {
for _, repo := range rp.repos {
if !repo.running {
go repo.StartLoop(context.TODO())
continue
}
rp.log.Info("start loop is already running", "repo", repo.gitURL.repo)
}
return nil
}

// Repo will return Repository object based on given remote URL
func (rp *RepoPool) Repo(remote string) (*Repository, error) {
gitURL, err := ParseGitURL(remote)
Expand Down
56 changes: 56 additions & 0 deletions pkg/mirror/repo_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package mirror

import (
"os"
"path/filepath"
"testing"
)

func TestRepoPool_validateLinkPath(t *testing.T) {
root := "/tmp/root"

rpc := RepoPoolConfig{
Defaults: DefaultConfig{
Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always",
},
Repositories: []RepositoryConfig{
{
Remote: "[email protected]:org/repo1.git",
Worktrees: []WorktreeConfig{{Link: "link1"}},
},
{
Remote: "[email protected]:org/repo2.git",
Worktrees: []WorktreeConfig{{Link: "link2"}},
},
},
}

rp, err := NewRepoPool(rpc, nil, testENVs)
if err != nil {
t.Fatalf("unexpected err:%s", err)
}

tests := []struct {
name string
repo *Repository
link string
wantErr bool
}{
{"add-repo2-link-to-repo1", rp.repos[0], "link2", true},
{"add-repo2-abs-link-to-repo1", rp.repos[0], filepath.Join(root, "link2"), true},
{"add-repo1-link-to-repo2", rp.repos[1], "link1", true},
{"add-repo1-abs-link-to-repo2", rp.repos[1], filepath.Join(root, "link1"), true},
{"add-new-link", rp.repos[0], "link3", false},
{"add-new-link", rp.repos[1], "link3", false},
{"add-new-abs-link", rp.repos[0], filepath.Join(os.TempDir(), "temp", "link1"), false},
{"add-new-abs-link", rp.repos[1], filepath.Join(os.TempDir(), "temp", "link2"), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

if err := rp.validateLinkPath(tt.repo, tt.link); (err != nil) != tt.wantErr {
t.Errorf("RepoPool.validateLinkPath() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
82 changes: 19 additions & 63 deletions pkg/mirror/z_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,10 @@ func Test_RepoPool_Success(t *testing.T) {
t.Fatalf("unexpected err:%s", err)
}

time.Sleep(time.Second)
// run initial mirror
if err := rp.Mirror(context.TODO(), testTimeout); err != nil {
t.Fatalf("unexpected err:%s", err)
}

// verify Hash and checked out files
if got, err := rp.Hash(txtCtx, remote1, "HEAD", ""); err != nil {
Expand Down Expand Up @@ -1069,11 +1072,21 @@ func Test_RepoPool_Success(t *testing.T) {

t.Log("TEST-2: forward both upstream and test mirrors")

// start mirror loop
if err := rp.StartLoop(); err != nil {
t.Fatalf("unexpected err:%s", err)
}
time.Sleep(time.Second)
// start mirror loop again this should be no op
if err := rp.StartLoop(); err != nil {
t.Fatalf("unexpected err:%s", err)
}

fileU1SHA2 := mustCommit(t, upstream1, "file", t.Name()+"-u1-main-2")
fileU2SHA2 := mustCommit(t, upstream2, "file", t.Name()+"-u2-main-2")

// wait for the mirror
time.Sleep(time.Second)
time.Sleep(2 * time.Second)

// verify Hash, commit msg and checked out files
if got, err := rp.Hash(txtCtx, remote1, "HEAD", ""); err != nil {
Expand Down Expand Up @@ -1213,6 +1226,10 @@ func Test_RepoPool_Error(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// start mirror loop
if err := rp.StartLoop(); err != nil {
t.Fatalf("unexpected err:%s", err)
}

time.Sleep(time.Second)

Expand Down Expand Up @@ -1274,67 +1291,6 @@ func Test_RepoPool_Error(t *testing.T) {
}
}

func TestRepoPool_validateLinkPath(t *testing.T) {
testTmpDir := mustTmpDir(t)
defer os.RemoveAll(testTmpDir)

upstream1 := filepath.Join(testTmpDir, testUpstreamRepo)
remote1 := "file://" + upstream1
upstream2 := filepath.Join(testTmpDir, "upstream2")
remote2 := "file://" + upstream2
root := filepath.Join(testTmpDir, testRoot)

t.Log("TEST-1: init both upstream and test mirrors")

mustInitRepo(t, upstream1, "file", t.Name()+"-u1-main-1")
mustInitRepo(t, upstream2, "file", t.Name()+"-u2-main-1")

rpc := RepoPoolConfig{
Defaults: DefaultConfig{
Root: root, Interval: testInterval, MirrorTimeout: testTimeout, GitGC: "always",
},
Repositories: []RepositoryConfig{
{
Remote: remote1,
Worktrees: []WorktreeConfig{{Link: "link1"}},
},
{
Remote: remote2,
Worktrees: []WorktreeConfig{{Link: "link2"}},
},
},
}

rp, err := NewRepoPool(rpc, nil, testENVs)
if err != nil {
t.Fatalf("unexpected err:%s", err)
}

tests := []struct {
name string
repo *Repository
link string
wantErr bool
}{
{"add-repo2-link-to-repo1", rp.repos[0], "link2", true},
{"add-repo2-abs-link-to-repo1", rp.repos[0], filepath.Join(root, "link2"), true},
{"add-repo1-link-to-repo2", rp.repos[1], "link1", true},
{"add-repo1-abs-link-to-repo2", rp.repos[1], filepath.Join(root, "link1"), true},
{"add-new-link", rp.repos[0], "link3", false},
{"add-new-link", rp.repos[1], "link3", false},
{"add-new-abs-link", rp.repos[0], filepath.Join(testTmpDir, "temp", "link1"), false},
{"add-new-abs-link", rp.repos[1], filepath.Join(testTmpDir, "temp", "link2"), false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

if err := rp.validateLinkPath(tt.repo, tt.link); (err != nil) != tt.wantErr {
t.Errorf("RepoPool.validateLinkPath() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

// ##############################################
// HELPER FUNCS
// ##############################################
Expand Down

0 comments on commit a5109e7

Please sign in to comment.