mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-04-15 01:41:56 +00:00
635 lines
19 KiB
Go
635 lines
19 KiB
Go
// Copyright 2022 Woodpecker Authors
|
|
// Copyright 2021 Informatyka Boguslawski sp. z o.o. sp.k., http://www.ib.pl/
|
|
// Copyright 2018 Drone.IO Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rpc
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"maps"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/oklog/ulid/v2"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/rs/zerolog/log"
|
|
grpc_metadata "google.golang.org/grpc/metadata"
|
|
|
|
"go.woodpecker-ci.org/woodpecker/v3/rpc"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server/forge"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server/logging"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server/model"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server/pipeline"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server/pubsub"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server/queue"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server/scheduler"
|
|
"go.woodpecker-ci.org/woodpecker/v3/server/store"
|
|
)
|
|
|
|
// updateAgentLastWorkDelay the delay before the LastWork info should be updated.
|
|
const updateAgentLastWorkDelay = time.Minute
|
|
|
|
type RPC struct {
|
|
scheduler scheduler.Scheduler
|
|
logger logging.Log
|
|
store store.Store
|
|
pipelineTime *prometheus.GaugeVec
|
|
pipelineCount *prometheus.CounterVec
|
|
}
|
|
|
|
// Next blocks until it provides the next workflow to execute.
|
|
// TODO (6038): Server does not release waiting agents on graceful shutdown.
|
|
func (s *RPC) Next(c context.Context, agentFilter rpc.Filter) (*rpc.Workflow, error) {
|
|
if hostname, err := s.getHostnameFromContext(c); err == nil {
|
|
log.Debug().Msgf("agent connected: %s: polling", hostname)
|
|
}
|
|
|
|
agent, err := s.getAgentFromContext(c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if agent.NoSchedule {
|
|
time.Sleep(1 * time.Second)
|
|
return nil, nil
|
|
}
|
|
|
|
agentServerLabels, err := agent.GetServerLabels()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// enforce labels from server by overwriting agent labels
|
|
maps.Copy(agentFilter.Labels, agentServerLabels)
|
|
|
|
log.Trace().Msgf("Agent %s[%d] tries to pull task with labels: %v", agent.Name, agent.ID, agentFilter.Labels)
|
|
|
|
filterFn := createFilterFunc(agentFilter)
|
|
|
|
for {
|
|
// poll blocks until a task is available or the context is canceled / worker is kicked
|
|
task, err := s.scheduler.Poll(c, agent.ID, filterFn)
|
|
if err != nil || task == nil {
|
|
return nil, err
|
|
}
|
|
|
|
if task.ShouldRun() {
|
|
workflow := new(rpc.Workflow)
|
|
err = json.Unmarshal(task.Data, workflow)
|
|
return workflow, err
|
|
}
|
|
|
|
// task should not run, so mark it as done
|
|
if err := s.Done(c, task.ID, rpc.WorkflowState{}); err != nil {
|
|
log.Error().Err(err).Msgf("marking workflow task '%s' as done failed", task.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Wait blocks until the workflow with the given ID is completed or got canceled.
|
|
// Used to let agents wait for cancel signals from server side.
|
|
func (s *RPC) Wait(c context.Context, workflowID string) (canceled bool, err error) {
|
|
agent, err := s.getAgentFromContext(c)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if err := s.checkAgentPermissionByWorkflow(c, agent, workflowID, nil, nil); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if err := s.scheduler.Wait(c, workflowID); err != nil {
|
|
if errors.Is(err, queue.ErrCancel) {
|
|
// we explicit send a cancel signal
|
|
log.Debug().Str("workflowID", workflowID).Msg("while waiting the queue reported the workflow as canceled")
|
|
return true, nil
|
|
}
|
|
// unknown error happened
|
|
log.Error().Err(err).Str("workflowID", workflowID).Msg("while waiting the queue returned an unexpected error")
|
|
return false, err
|
|
}
|
|
|
|
// workflow finished and on issues appeared
|
|
log.Debug().Str("workflowID", workflowID).Msg("queue reported the workflow as finished")
|
|
return false, nil
|
|
}
|
|
|
|
// Extend extends the lease for the workflow with the given ID.
|
|
func (s *RPC) Extend(c context.Context, workflowID string) error {
|
|
agent, err := s.getAgentFromContext(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.updateAgentLastWork(agent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := s.checkAgentPermissionByWorkflow(c, agent, workflowID, nil, nil); err != nil {
|
|
return err
|
|
}
|
|
|
|
return s.scheduler.Extend(c, agent.ID, workflowID)
|
|
}
|
|
|
|
// Update let agent updates the step state at the server.
|
|
func (s *RPC) Update(c context.Context, strWorkflowID string, state rpc.StepState) error {
|
|
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
workflow, err := s.store.WorkflowLoad(workflowID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("rpc.update: cannot find workflow with id %d", workflowID)
|
|
return err
|
|
}
|
|
|
|
currentPipeline, err := s.store.GetPipeline(workflow.PipelineID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID)
|
|
return err
|
|
}
|
|
|
|
agent, err := s.getAgentFromContext(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
step, err := s.store.StepByUUID(state.StepUUID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find step with uuid %s", state.StepUUID)
|
|
return err
|
|
}
|
|
|
|
if step.PipelineID != currentPipeline.ID {
|
|
msg := fmt.Sprintf("agent returned status with step uuid '%s' which does not belong to current pipeline", state.StepUUID)
|
|
log.Error().
|
|
Int64("stepPipelineID", step.PipelineID).
|
|
Int64("currentPipelineID", currentPipeline.ID).
|
|
Msg(msg)
|
|
return errors.New(msg)
|
|
}
|
|
|
|
repo, err := s.store.GetRepo(currentPipeline.RepoID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find repo with id %d", currentPipeline.RepoID)
|
|
return err
|
|
}
|
|
|
|
// check before agent can alter some state
|
|
if err := s.checkAgentPermissionByWorkflow(c, agent, strWorkflowID, currentPipeline, repo); err != nil {
|
|
return err
|
|
}
|
|
|
|
// sanitize agent input: only allow step updates that the workflow state permits
|
|
if err := checkWorkflowAllowsStepUpdate(workflow.State, step, state); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := pipeline.UpdateStepStatus(c, s.store, step, state); err != nil {
|
|
log.Error().Err(err).Msg("rpc.update: cannot update step")
|
|
}
|
|
|
|
if state.Exited {
|
|
server.Config.Services.LogStore.StepFinished(step)
|
|
}
|
|
|
|
if currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline); err != nil {
|
|
log.Error().Err(err).Msg("cannot build tree from step list")
|
|
return err
|
|
}
|
|
|
|
return s.notify(c, repo, currentPipeline)
|
|
}
|
|
|
|
// Init signals the workflow is initialized.
|
|
func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowState) error {
|
|
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
workflow, err := s.store.WorkflowLoad(workflowID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find workflow with id %d", workflowID)
|
|
return err
|
|
}
|
|
|
|
agent, err := s.getAgentFromContext(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
workflow.AgentID = agent.ID
|
|
|
|
currentPipeline, err := s.store.GetPipeline(workflow.PipelineID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID)
|
|
return err
|
|
}
|
|
|
|
repo, err := s.store.GetRepo(currentPipeline.RepoID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find repo with id %d", currentPipeline.RepoID)
|
|
return err
|
|
}
|
|
|
|
// check before agent can alter some state
|
|
if err := s.checkAgentPermissionByWorkflow(c, agent, strWorkflowID, currentPipeline, repo); err != nil {
|
|
return err
|
|
}
|
|
|
|
// check workflow's own state to prevent re-initializing a finished or blocked workflow
|
|
if err := checkWorkflowState(workflow.State); err != nil {
|
|
return err
|
|
}
|
|
|
|
if currentPipeline.Status == model.StatusPending {
|
|
if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil {
|
|
log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID)
|
|
}
|
|
}
|
|
|
|
s.updateForgeStatus(c, repo, currentPipeline, workflow)
|
|
|
|
defer func() {
|
|
currentPipeline.Workflows, _ = s.store.WorkflowGetTree(currentPipeline)
|
|
|
|
if err := s.notify(c, repo, currentPipeline); err != nil {
|
|
log.Error().Err(err).Msg("could not publish pipeline state change to pubsub")
|
|
}
|
|
}()
|
|
|
|
workflow, err = pipeline.UpdateWorkflowStatusToRunning(s.store, *workflow, state)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.updateForgeStatus(c, repo, currentPipeline, workflow)
|
|
|
|
return s.updateAgentLastWork(agent)
|
|
}
|
|
|
|
// Done marks the workflow with the given ID as stopped.
|
|
func (s *RPC) Done(c context.Context, strWorkflowID string, state rpc.WorkflowState) error {
|
|
workflowID, err := strconv.ParseInt(strWorkflowID, 10, 64)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
workflow, err := s.store.WorkflowLoad(workflowID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find workflow with id %d", workflowID)
|
|
return err
|
|
}
|
|
|
|
workflow.Children, err = s.store.StepListFromWorkflowFind(workflow)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
currentPipeline, err := s.store.GetPipeline(workflow.PipelineID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find pipeline with id %d", workflow.PipelineID)
|
|
return err
|
|
}
|
|
|
|
repo, err := s.store.GetRepo(currentPipeline.RepoID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find repo with id %d", currentPipeline.RepoID)
|
|
return err
|
|
}
|
|
|
|
agent, err := s.getAgentFromContext(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// check before agent can alter some state
|
|
if err := s.checkAgentPermissionByWorkflow(c, agent, strWorkflowID, currentPipeline, repo); err != nil {
|
|
return err
|
|
}
|
|
|
|
// check workflow's own state to prevent finishing an already-finished or blocked workflow
|
|
if err := checkWorkflowState(workflow.State); err != nil {
|
|
return err
|
|
}
|
|
|
|
logger := log.With().
|
|
Str("repo_id", fmt.Sprint(repo.ID)).
|
|
Str("pipeline_id", fmt.Sprint(currentPipeline.ID)).
|
|
Str("workflow_id", strWorkflowID).Logger()
|
|
|
|
logger.Debug().Msgf("workflow state in store: %#v", workflow)
|
|
logger.Debug().Msgf("gRPC Done with state: %#v", state)
|
|
|
|
// Complete any still-running children (e.g. service containers) before
|
|
// computing the workflow status, so their final state is reflected.
|
|
s.completeChildrenIfParentCompleted(workflow, state.Finished)
|
|
|
|
if workflow, err = pipeline.UpdateWorkflowStatusToDone(s.store, *workflow, state); err != nil {
|
|
logger.Error().Err(err).Msgf("pipeline.UpdateWorkflowStatusToDone: cannot update workflow state: %s", err)
|
|
}
|
|
|
|
var queueErr error
|
|
if !state.Canceled {
|
|
if workflow.Failing() {
|
|
queueErr = s.scheduler.Error(c, strWorkflowID, fmt.Errorf("workflow finished with error %s", state.Error))
|
|
} else {
|
|
queueErr = s.scheduler.Done(c, strWorkflowID, workflow.State)
|
|
}
|
|
} else {
|
|
if workflow.Started > 0 {
|
|
queueErr = s.scheduler.Done(c, strWorkflowID, model.StatusKilled)
|
|
} else {
|
|
queueErr = s.scheduler.Done(c, strWorkflowID, model.StatusCanceled)
|
|
}
|
|
}
|
|
if queueErr != nil {
|
|
logger.Error().Err(queueErr).Msg("queue.Done: cannot ack workflow")
|
|
}
|
|
|
|
currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !model.IsThereRunningStage(currentPipeline.Workflows) {
|
|
if currentPipeline, err = pipeline.UpdateStatusToDone(s.store, *currentPipeline, pipeline.PipelineStatus(currentPipeline.Workflows), workflow.Finished); err != nil {
|
|
logger.Error().Err(err).Msgf("pipeline.UpdateStatusToDone: cannot update workflows final state")
|
|
}
|
|
}
|
|
|
|
s.updateForgeStatus(c, repo, currentPipeline, workflow)
|
|
|
|
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
|
|
go func() {
|
|
for _, step := range workflow.Children {
|
|
if step.State != model.StatusSkipped {
|
|
if err := s.logger.Close(c, step.ID); err != nil {
|
|
logger.Error().Err(err).Msgf("done: cannot close log stream for step %d", step.ID)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
if err := s.notify(c, repo, currentPipeline); err != nil {
|
|
return err
|
|
}
|
|
|
|
if currentPipeline.Status == model.StatusSuccess || currentPipeline.Status == model.StatusFailure {
|
|
s.pipelineCount.WithLabelValues(repo.FullName, currentPipeline.Branch, string(currentPipeline.Status), "total").Inc()
|
|
s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(currentPipeline.Status), "total").Set(float64(currentPipeline.Finished - currentPipeline.Started))
|
|
}
|
|
if currentPipeline.IsMultiPipeline() {
|
|
s.pipelineTime.WithLabelValues(repo.FullName, currentPipeline.Branch, string(workflow.State), workflow.Name).Set(float64(workflow.Finished - workflow.Started))
|
|
}
|
|
|
|
return s.updateAgentLastWork(agent)
|
|
}
|
|
|
|
// Log writes a log entry to the database and publishes it to the pubsub.
|
|
// An explicit stepUUID makes it obvious that all entries must come from the same step.
|
|
func (s *RPC) Log(c context.Context, stepUUID string, rpcLogEntries []*rpc.LogEntry) error {
|
|
step, err := s.store.StepByUUID(stepUUID)
|
|
if err != nil {
|
|
return fmt.Errorf("could not find step with uuid %s in store: %w", stepUUID, err)
|
|
}
|
|
|
|
agent, err := s.getAgentFromContext(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
currentPipeline, err := s.store.GetPipeline(step.PipelineID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot find pipeline with id %d", step.PipelineID)
|
|
return err
|
|
}
|
|
|
|
// check before agent can alter some state
|
|
if err := s.checkAgentPermissionByWorkflow(c, agent, "", currentPipeline, nil); err != nil {
|
|
return err
|
|
}
|
|
|
|
// sanitize agent input
|
|
if err := allowAppendingLogs(currentPipeline, step); err != nil {
|
|
return fmt.Errorf("can not alter logs: %w", err)
|
|
}
|
|
|
|
err = s.updateAgentLastWork(agent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var logEntries []*model.LogEntry
|
|
|
|
for _, rpcLogEntry := range rpcLogEntries {
|
|
if rpcLogEntry.StepUUID != stepUUID {
|
|
return fmt.Errorf("expected step UUID %s, got %s", stepUUID, rpcLogEntry.StepUUID)
|
|
}
|
|
logEntries = append(logEntries, &model.LogEntry{
|
|
StepID: step.ID,
|
|
Time: rpcLogEntry.Time,
|
|
Line: rpcLogEntry.Line,
|
|
Data: rpcLogEntry.Data,
|
|
Type: model.LogEntryType(rpcLogEntry.Type),
|
|
})
|
|
}
|
|
|
|
// make sure writes to pubsub are non blocking (https://github.com/woodpecker-ci/woodpecker/blob/c919f32e0b6432a95e1a6d3d0ad662f591adf73f/server/logging/log.go#L9)
|
|
go func() {
|
|
// write line to listening web clients
|
|
if err := s.logger.Write(c, step.ID, logEntries); err != nil {
|
|
log.Error().Err(err).Msgf("rpc server could not write to logger")
|
|
}
|
|
}()
|
|
|
|
if err = server.Config.Services.LogStore.LogAppend(step, logEntries); err != nil {
|
|
log.Error().Err(err).Msg("could not store log entries")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *RPC) RegisterAgent(ctx context.Context, info rpc.AgentInfo) (int64, error) {
|
|
agent, err := s.getAgentFromContext(ctx)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
if agent.Name == "" {
|
|
if hostname, err := s.getHostnameFromContext(ctx); err == nil {
|
|
agent.Name = hostname
|
|
}
|
|
}
|
|
|
|
agent.Backend = info.Backend
|
|
agent.Platform = info.Platform
|
|
agent.Capacity = int32(info.Capacity)
|
|
agent.Version = info.Version
|
|
agent.CustomLabels = info.CustomLabels
|
|
|
|
err = s.store.AgentUpdate(agent)
|
|
if err != nil {
|
|
return -1, err
|
|
}
|
|
|
|
return agent.ID, nil
|
|
}
|
|
|
|
// UnregisterAgent removes the agent from the database.
|
|
func (s *RPC) UnregisterAgent(ctx context.Context) error {
|
|
agent, err := s.getAgentFromContext(ctx)
|
|
if !agent.IsSystemAgent() {
|
|
// registered with individual agent token -> do not unregister
|
|
return nil
|
|
}
|
|
log.Debug().Msgf("un-registering agent with ID %d", agent.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.store.AgentDelete(agent)
|
|
|
|
return err
|
|
}
|
|
|
|
func (s *RPC) ReportHealth(ctx context.Context, status string) error {
|
|
agent, err := s.getAgentFromContext(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if status != "I am alive!" {
|
|
//nolint:staticcheck
|
|
return errors.New("Are you alive?")
|
|
}
|
|
|
|
agent.LastContact = time.Now().Unix()
|
|
|
|
return s.store.AgentUpdate(agent)
|
|
}
|
|
|
|
func (s *RPC) completeChildrenIfParentCompleted(completedWorkflow *model.Workflow, finished int64) {
|
|
for _, c := range completedWorkflow.Children {
|
|
if c.Running() {
|
|
if updated, err := pipeline.UpdateStepToStatusSkipped(s.store, *c, finished, model.StatusKilled); err != nil {
|
|
log.Error().Err(err).Msgf("done: cannot update step_id %d child state", c.ID)
|
|
} else {
|
|
// Update in-memory state so WorkflowStatus sees the final state
|
|
c.State = updated.State
|
|
c.Finished = updated.Finished
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *RPC) updateForgeStatus(ctx context.Context, repo *model.Repo, pipeline *model.Pipeline, workflow *model.Workflow) {
|
|
user, err := s.store.GetUser(repo.UserID)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("cannot get user with id '%d'", repo.UserID)
|
|
return
|
|
}
|
|
|
|
_forge, err := server.Config.Services.Manager.ForgeFromRepo(repo)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("can not get forge for repo '%s'", repo.FullName)
|
|
return
|
|
}
|
|
|
|
forge.Refresh(ctx, _forge, s.store, user)
|
|
|
|
// only do status updates for parent steps
|
|
if workflow != nil {
|
|
err = _forge.Status(ctx, user, repo, pipeline, workflow)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("error setting commit status for %s/%d", repo.FullName, pipeline.Number)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Notify push to our pubsub infra pipeline state changes.
|
|
func (s *RPC) notify(c context.Context, repo *model.Repo, pipeline *model.Pipeline) (err error) {
|
|
message := pubsub.Message{ID: ulid.Make().String()}
|
|
message.Data, err = json.Marshal(model.Event{
|
|
Repo: *repo,
|
|
Pipeline: *pipeline,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("can't marshal JSON: %w", err)
|
|
}
|
|
|
|
subTopics := make(map[string]struct{})
|
|
// if repo is public, push to public topic
|
|
if !repo.IsSCMPrivate {
|
|
subTopics[pubsub.PublicTopic] = struct{}{}
|
|
}
|
|
// publish to repo specific topic
|
|
subTopics[pubsub.GetRepoTopic(repo)] = struct{}{}
|
|
|
|
return s.scheduler.Publish(c, subTopics, message)
|
|
}
|
|
|
|
func (s *RPC) getAgentFromContext(ctx context.Context) (*model.Agent, error) {
|
|
md, ok := grpc_metadata.FromIncomingContext(ctx)
|
|
if !ok {
|
|
return nil, errors.New("metadata is not provided")
|
|
}
|
|
|
|
values := md["agent_id"]
|
|
if len(values) == 0 {
|
|
return nil, errors.New("agent_id is not provided")
|
|
}
|
|
|
|
_agentID := values[0]
|
|
agentID, err := strconv.ParseInt(_agentID, 10, 64)
|
|
if err != nil {
|
|
return nil, errors.New("agent_id is not a valid integer")
|
|
}
|
|
|
|
return s.store.AgentFind(agentID)
|
|
}
|
|
|
|
func (s *RPC) getHostnameFromContext(ctx context.Context) (string, error) {
|
|
metadata, ok := grpc_metadata.FromIncomingContext(ctx)
|
|
if ok {
|
|
hostname, ok := metadata["hostname"]
|
|
if ok && len(hostname) != 0 {
|
|
return hostname[0], nil
|
|
}
|
|
}
|
|
return "", errors.New("no hostname in metadata")
|
|
}
|
|
|
|
func (s *RPC) updateAgentLastWork(agent *model.Agent) error {
|
|
// only update agent.LastWork if not recently updated
|
|
if time.Unix(agent.LastWork, 0).Add(updateAgentLastWorkDelay).After(time.Now()) {
|
|
return nil
|
|
}
|
|
|
|
agent.LastWork = time.Now().Unix()
|
|
if err := s.store.AgentUpdate(agent); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|