diff --git a/edge/edge.go b/edge/edge.go index fc544d111..90585b6cc 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -3,6 +3,7 @@ package edge import ( "errors" "fmt" + "github.com/portainer/agent/edge/scheduler" "log" "time" @@ -18,7 +19,7 @@ type ( clusterService agent.ClusterService dockerInfoService agent.DockerInfoService key *edgeKey - logsManager *logsManager + logsManager *scheduler.LogsManager pollService *PollService pollServiceConfig *pollServiceConfig stackManager *StackManager @@ -74,8 +75,8 @@ func (manager *Manager) Start() error { } manager.stackManager = stackManager - manager.logsManager = newLogsManager(manager.key.PortainerInstanceURL, manager.key.EndpointID, manager.agentOptions.EdgeID, pollServiceConfig.InsecurePoll) - manager.logsManager.start() + manager.logsManager = scheduler.NewLogsManager(manager.key.PortainerInstanceURL, manager.key.EndpointID, manager.agentOptions.EdgeID, pollServiceConfig.InsecurePoll) + manager.logsManager.Start() pollService, err := newPollService(manager.stackManager, manager.logsManager, pollServiceConfig) if err != nil { diff --git a/edge/poll.go b/edge/poll.go index adb85d40c..4b8c2a6ab 100644 --- a/edge/poll.go +++ b/edge/poll.go @@ -38,7 +38,7 @@ type PollService struct { endpointID string tunnelServerAddr string tunnelServerFingerprint string - logsManager *logsManager + logsManager *scheduler.LogsManager containerPlatform agent.ContainerPlatform } @@ -56,7 +56,7 @@ type pollServiceConfig struct { } // newPollService returns a pointer to a new instance of PollService -func newPollService(edgeStackManager *StackManager, logsManager *logsManager, config *pollServiceConfig) (*PollService, error) { +func newPollService(edgeStackManager *StackManager, logsManager *scheduler.LogsManager, config *pollServiceConfig) (*PollService, error) { pollFrequency, err := time.ParseDuration(config.PollFrequency) if err != nil { return nil, err @@ -293,7 +293,7 @@ func (service *PollService) poll() error { } } - service.logsManager.handleReceivedLogsRequests(logsToCollect) + service.logsManager.HandleReceivedLogsRequests(logsToCollect) if responseData.CheckinInterval != service.pollIntervalInSeconds { log.Printf("[DEBUG] [internal,edge,poll] [old_interval: %f] [new_interval: %f] [message: updating poll interval]", service.pollIntervalInSeconds, responseData.CheckinInterval) diff --git a/edge/logs.go b/edge/scheduler/logs.go similarity index 88% rename from edge/logs.go rename to edge/scheduler/logs.go index d0c2220a8..31b8f4326 100644 --- a/edge/logs.go +++ b/edge/scheduler/logs.go @@ -1,4 +1,4 @@ -package edge +package scheduler import ( "fmt" @@ -10,7 +10,7 @@ import ( "github.com/portainer/agent/http/client" ) -type logsManager struct { +type LogsManager struct { httpClient *client.PortainerClient stopSignal chan struct{} jobs map[int]logStatus @@ -25,17 +25,17 @@ const ( logFailed ) -func newLogsManager(portainerURL, endpointID, edgeID string, insecurePoll bool) *logsManager { +func NewLogsManager(portainerURL, endpointID, edgeID string, insecurePoll bool) *LogsManager { cli := client.NewPortainerClient(portainerURL, endpointID, edgeID, insecurePoll) - return &logsManager{ + return &LogsManager{ httpClient: cli, stopSignal: nil, jobs: map[int]logStatus{}, } } -func (manager *logsManager) start() error { +func (manager *LogsManager) Start() error { if manager.stopSignal != nil { return nil } @@ -100,7 +100,7 @@ func (manager *logsManager) start() error { return nil } -func (manager *logsManager) stop() { +func (manager *LogsManager) stop() { if manager.stopSignal != nil { log.Printf("[DEBUG] [internal,edge,logs] [message: logs manager stopped]") close(manager.stopSignal) @@ -108,7 +108,7 @@ func (manager *logsManager) stop() { } } -func (manager *logsManager) handleReceivedLogsRequests(jobs []int) { +func (manager *LogsManager) HandleReceivedLogsRequests(jobs []int) { for _, jobID := range jobs { if _, ok := manager.jobs[jobID]; !ok { log.Printf("[DEBUG] [internal,edge,logs] [job_identifier: %d] [message: added job to queue]", jobID) @@ -117,7 +117,7 @@ func (manager *logsManager) handleReceivedLogsRequests(jobs []int) { } } -func (manager *logsManager) next() int { +func (manager *LogsManager) next() int { for jobID, status := range manager.jobs { if status == logPending { return jobID