mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2026-02-13 21:00:00 +00:00
Co-authored-by: pnkcaht <samzoovsk19@gmail.com> Co-authored-by: qwerty287 <80460567+qwerty287@users.noreply.github.com> Co-authored-by: Lauris B <lauris@nix.lv>
306 lines
13 KiB
Go
306 lines
13 KiB
Go
// Copyright 2021 Woodpecker Authors
|
|
// Copyright 2011 Drone.IO Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rpc
|
|
|
|
import "context"
|
|
|
|
// Peer defines the bidirectional communication interface between Woodpecker agents and servers.
|
|
//
|
|
// # Architecture and Implementations
|
|
//
|
|
// The Peer interface is implemented differently on each side of the communication:
|
|
//
|
|
// - Agent side: Implemented by agent/rpc/client_grpc.go's client struct, which wraps
|
|
// a gRPC client connection to make RPC calls to the server.
|
|
//
|
|
// - Server side: Implemented by server/rpc/rpc.go's RPC struct, which contains the
|
|
// business logic and is wrapped by server/rpc/server.go's WoodpeckerServer struct
|
|
// to handle incoming gRPC requests.
|
|
//
|
|
// # Thread Safety and Concurrency
|
|
//
|
|
// - Implementations must be safe for concurrent calls across different workflows
|
|
// - The same Peer instance may be called concurrently from multiple goroutines
|
|
// - Each workflow is identified by a unique workflowID string
|
|
// - Implementations must properly isolate workflow state using workflowID
|
|
//
|
|
// # Error Handling Conventions
|
|
//
|
|
// - Methods return errors for communication failures, validation errors, or server-side issues
|
|
// - Errors should not be used for business logic
|
|
// - Network/transport errors should be retried by the caller when appropriate
|
|
// - Nil error indicates successful operation
|
|
// - Context cancellation should return nil or context.Canceled, not a custom error
|
|
// - Business logic errors (e.g., workflow not found) return specific error types
|
|
//
|
|
// # Intended Execution Flow
|
|
//
|
|
// 1. Agent Lifecycle:
|
|
// - Version() checks compatibility with server
|
|
// - RegisterAgent() announces agent availability
|
|
// - ReportHealth() periodically confirms agent is alive
|
|
// - UnregisterAgent() gracefully disconnects agent
|
|
//
|
|
// 2. Workflow Execution (may happen concurrently for multiple workflows):
|
|
// - Next() blocks until server assigns a workflow
|
|
// - Init() signals workflow execution has started
|
|
// - Wait() (in background goroutine) monitors for cancellation signals
|
|
// - Update() reports step state changes as workflow progresses
|
|
// - EnqueueLog() streams log output from steps
|
|
// - Extend() extends workflow timeout if needed so queue does not reschedule it as retry
|
|
// - Done() signals workflow has completed
|
|
//
|
|
// 3. Cancellation Flow:
|
|
// - Server can cancel workflow by releasing Wait() with canceled=true
|
|
// - Agent detects cancellation from Wait() return value
|
|
// - Agent stops workflow execution and calls Done() with canceled state
|
|
type Peer interface {
|
|
// Version returns the server and gRPC protocol version information.
|
|
//
|
|
// This is typically called once during agent initialization to verify
|
|
// compatibility between agent and server versions.
|
|
//
|
|
// Returns:
|
|
// - Version with server version string and gRPC protocol version number
|
|
// - Error if communication fails or server is unreachable
|
|
Version(c context.Context) (*Version, error)
|
|
|
|
// Next blocks until the server provides the next workflow to execute from the queue.
|
|
//
|
|
// This is the primary work-polling mechanism. Agents call this repeatedly in a loop,
|
|
// and it blocks until either:
|
|
// 1. A workflow matching the filter becomes available
|
|
// 2. The context is canceled (agent shutdown, network timeout, etc.)
|
|
//
|
|
// The filter allows agents to specify capabilities via labels (e.g., platform,
|
|
// backend type) so the server only assigns compatible workflows.
|
|
//
|
|
// Context Handling:
|
|
// - This is a long-polling operation that may block for extended periods
|
|
// - Implementations MUST check context regularly (not just at entry)
|
|
// - When context is canceled, must return nil workflow and nil error
|
|
// - Server may send keep-alive signals or periodically return nil to allow reconnection
|
|
//
|
|
// Returns:
|
|
// - Workflow object with ID, Config, and Workflow.Timeout if work is available
|
|
// - nil, nil if context is canceled or no work available (retry expected)
|
|
// - nil, error if a non-retryable error occurs
|
|
Next(c context.Context, f Filter) (*Workflow, error)
|
|
|
|
// Wait blocks until the workflow with the given ID completes or is canceled by the server.
|
|
//
|
|
// This is used by agents to monitor for server-side cancellation signals. Typically
|
|
// called in a background goroutine immediately after Init(), running concurrently
|
|
// with workflow execution.
|
|
//
|
|
// The method serves two purposes:
|
|
// 1. Signals when server wants to cancel workflow (canceled=true)
|
|
// 2. Unblocks when workflow completes normally on agent (canceled=false)
|
|
//
|
|
// Context Handling:
|
|
// - This is a long-running blocking operation for the workflow duration
|
|
// - Context cancellation indicates shutdown, not workflow cancellation
|
|
// - When context is canceled, should return (false, nil) or (false, ctx.Err())
|
|
// - Must not confuse context cancellation with workflow cancellation signal
|
|
//
|
|
// Cancellation Flow:
|
|
// - Server releases Wait() with canceled=true → agent should stop workflow
|
|
// - Agent completes workflow normally → Done() is called → server releases Wait() with canceled=false
|
|
// - Agent context canceled → Wait() returns immediately, workflow may continue on agent
|
|
//
|
|
// Returns:
|
|
// - canceled=true, err=nil: Server initiated cancellation, agent should stop workflow
|
|
// - canceled=false, err=nil: Workflow completed normally (Wait unblocked by Done call)
|
|
// - canceled=false, err!=nil: Communication error, agent should retry or handle error
|
|
Wait(c context.Context, workflowID string) (canceled bool, err error)
|
|
|
|
// Init signals to the server that the workflow has been initialized and execution has started.
|
|
//
|
|
// This is called once per workflow immediately after the agent accepts it from Next()
|
|
// and before starting step execution. It allows the server to track workflow start time
|
|
// and update workflow status to "running".
|
|
//
|
|
// The WorkflowState should have:
|
|
// - Started: Unix timestamp when execution began
|
|
// - Finished: 0 (not finished yet)
|
|
// - Error: empty string (no error yet)
|
|
// - Canceled: false (not canceled yet)
|
|
//
|
|
// Returns:
|
|
// - nil on success
|
|
// - error if communication fails or server rejects the state
|
|
Init(c context.Context, workflowID string, state WorkflowState) error
|
|
|
|
// Done signals to the server that the workflow has completed execution.
|
|
//
|
|
// This is called once per workflow after all steps have finished (or workflow was canceled).
|
|
// It provides the final workflow state including completion time, any errors, and
|
|
// cancellation status.
|
|
//
|
|
// The WorkflowState should have:
|
|
// - Started: Unix timestamp when execution began (same as Init)
|
|
// - Finished: Unix timestamp when execution completed
|
|
// - Error: Error message if workflow failed, empty if successful
|
|
// - Canceled: true if workflow was canceled, false otherwise
|
|
//
|
|
// After Done() is called:
|
|
// - Server updates final workflow status in database
|
|
// - Server releases any Wait() calls for this workflow
|
|
// - Server removes workflow from active queue
|
|
// - Server notifies forge of workflow completion
|
|
//
|
|
// Context Handling:
|
|
// - MUST attempt to complete even if workflow context is canceled
|
|
// - Often called with a shutdown/cleanup context rather than workflow context
|
|
// - Critical for proper cleanup - should retry on transient failures
|
|
//
|
|
// Returns:
|
|
// - nil on success
|
|
// - error if communication fails or server rejects the state
|
|
Done(c context.Context, workflowID string, state WorkflowState) error
|
|
|
|
// Extend extends the timeout for the workflow with the given ID in the task queue.
|
|
//
|
|
// Agents must call Extend() regularly (e.g., every constant.TaskTimeout / 3) to signal
|
|
// that the workflow is still actively executing and prevent premature timeout.
|
|
//
|
|
// If agents don't call Extend periodically, the workflow will be rescheduled to a new
|
|
// agent after the timeout period expires (specified in constant.TaskTimeout).
|
|
//
|
|
// This acts as a heartbeat mechanism to detect stuck workflow executions. If an agent
|
|
// dies or becomes unresponsive, the server will eventually timeout the workflow and
|
|
// reassign it.
|
|
//
|
|
// IMPORTANT: Don't confuse this with Workflow.Timeout returned by Next() - they serve
|
|
// different purposes!
|
|
//
|
|
// Returns:
|
|
// - nil on success (timeout was extended)
|
|
// - error if communication fails or workflow is not found
|
|
Extend(c context.Context, workflowID string) error
|
|
|
|
// Update reports step state changes to the server as the workflow progresses.
|
|
//
|
|
// This is called multiple times per step:
|
|
// 1. When step starts (Exited=false, Finished=0)
|
|
// 2. When step completes (Exited=true, Finished and ExitCode set)
|
|
// 3. Potentially on progress updates if step has long-running operations
|
|
//
|
|
// The server uses these updates to:
|
|
// - Track step execution progress
|
|
// - Update UI with real-time status
|
|
// - Store step results in database
|
|
// - Calculate workflow completion
|
|
//
|
|
// Context Handling:
|
|
// - Failures should be logged but not block workflow execution
|
|
//
|
|
// Returns:
|
|
// - nil on success
|
|
// - error if communication fails or server rejects the state
|
|
Update(c context.Context, workflowID string, state StepState) error
|
|
|
|
// EnqueueLog queues a log entry for delayed batch sending to the server.
|
|
//
|
|
// Log entries are produced continuously during step execution and need to be
|
|
// transmitted efficiently. This method adds logs to an internal queue that
|
|
// batches and sends them periodically to reduce network overhead.
|
|
//
|
|
// The implementation should:
|
|
// - Queue the log entry in a memory buffer
|
|
// - Batch multiple entries together
|
|
// - Send batches periodically (e.g., every second) or when buffer fills
|
|
// - Handle backpressure if server is slow or network is congested
|
|
//
|
|
// Unlike other methods, EnqueueLog:
|
|
// - Does NOT take a context parameter (fire-and-forget)
|
|
// - Does NOT return an error (never blocks the caller)
|
|
// - Does NOT guarantee immediate transmission
|
|
//
|
|
// Thread Safety:
|
|
// - MUST be safe to call concurrently from multiple goroutines
|
|
// - May be called concurrently from different steps/workflows
|
|
// - Internal queue must be properly synchronized
|
|
EnqueueLog(logEntry *LogEntry)
|
|
|
|
// RegisterAgent announces this agent to the server and returns an agent ID.
|
|
//
|
|
// This is called once during agent startup to:
|
|
// - Create an agent record in the server database
|
|
// - Obtain a unique agent ID for subsequent requests
|
|
// - Declare agent capabilities (platform, backend, capacity, labels)
|
|
// - Enable server-side agent tracking and monitoring
|
|
//
|
|
// The AgentInfo should specify:
|
|
// - Version: Agent version string (e.g., "v2.0.0")
|
|
// - Platform: OS/architecture (e.g., "linux/amd64")
|
|
// - Backend: Execution backend (e.g., "docker", "kubernetes")
|
|
// - Capacity: Maximum concurrent workflows (e.g., 2)
|
|
// - CustomLabels: Additional key-value labels for filtering
|
|
//
|
|
// Context Handling:
|
|
// - Context cancellation indicates agent is aborting startup
|
|
// - Should not retry indefinitely - fail fast on persistent errors
|
|
//
|
|
// Returns:
|
|
// - agentID: Unique identifier for this agent (use in subsequent calls)
|
|
// - error: If registration fails
|
|
RegisterAgent(ctx context.Context, info AgentInfo) (int64, error)
|
|
|
|
// UnregisterAgent removes this agent from the server's registry.
|
|
//
|
|
// This is called during graceful agent shutdown to:
|
|
// - Mark agent as offline in server database
|
|
// - Allow server to stop assigning workflows to this agent
|
|
// - Clean up any agent-specific server resources
|
|
// - Provide clean shutdown signal to monitoring systems
|
|
//
|
|
// After UnregisterAgent:
|
|
// - Agent should stop calling Next() for new work
|
|
// - Agent should complete any in-progress workflows
|
|
// - Agent may call Done() to finish existing workflows
|
|
// - Agent should close network connections
|
|
//
|
|
// Context Handling:
|
|
// - MUST attempt to complete even during forced shutdown
|
|
// - Often called with a shutdown context (limited time)
|
|
// - Failure is logged but should not prevent agent exit
|
|
//
|
|
// Returns:
|
|
// - nil on success
|
|
// - error if communication fails
|
|
UnregisterAgent(ctx context.Context) error
|
|
|
|
// ReportHealth sends a periodic health status update to the server.
|
|
//
|
|
// This is called regularly (e.g., every 30 seconds) during agent operation to:
|
|
// - Prove agent is still alive and responsive
|
|
// - Allow server to detect dead or stuck agents
|
|
// - Update agent's "last seen" timestamp in database
|
|
// - Provide application-level keepalive beyond network keep-alive signals
|
|
//
|
|
// Health reporting helps the server:
|
|
// - Mark unresponsive agents as offline
|
|
// - Redistribute work from dead agents
|
|
// - Display accurate agent status in UI
|
|
// - Trigger alerts for infrastructure issues
|
|
//
|
|
// Returns:
|
|
// - nil on success
|
|
// - error if communication fails
|
|
ReportHealth(c context.Context) error
|
|
}
|