mirror of
https://github.com/kubernetes/node-problem-detector.git
synced 2026-03-06 11:40:27 +00:00
Merge pull request #79 from Random-Liu/change-resync-mechanism
Update NPD to only do forcibly sync every 1 minutes.
This commit is contained in:
@@ -32,12 +32,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// updatePeriod is the period which condition manager checks update.
|
||||
// updatePeriod is the period at which condition manager checks update.
|
||||
updatePeriod = 1 * time.Second
|
||||
// updateTimeout is the timeout of condition update operation.
|
||||
updateTimeout = 10 * time.Second
|
||||
// resyncPeriod is the period which condition manager does resync no matter whether these is any update.
|
||||
resyncPeriod = 30 * time.Second
|
||||
// resyncPeriod is the period at which condition manager does resync, only updates when needed.
|
||||
resyncPeriod = 10 * time.Second
|
||||
// heartbeatPeriod is the period at which condition manager does forcibly sync with apiserver.
|
||||
heartbeatPeriod = 1 * time.Minute
|
||||
)
|
||||
|
||||
// ConditionManager synchronizes node conditions with the apiserver with problem client.
|
||||
@@ -52,17 +52,21 @@ const (
|
||||
type ConditionManager interface {
|
||||
// Start starts the condition manager.
|
||||
Start()
|
||||
// UpdateCondition update specific condition.
|
||||
// UpdateCondition updates a specific condition.
|
||||
UpdateCondition(types.Condition)
|
||||
}
|
||||
|
||||
type conditionManager struct {
|
||||
sync.Mutex
|
||||
clock clock.Clock
|
||||
latest time.Time
|
||||
client problemclient.Client
|
||||
updates map[string]types.Condition
|
||||
conditions map[string]types.Condition
|
||||
clock clock.Clock
|
||||
latestTry time.Time
|
||||
resyncNeeded bool
|
||||
client problemclient.Client
|
||||
// updatesLock is the lock protecting updates. Only the field `updates`
|
||||
// will be accessed by random caller and the sync routine, so only it
|
||||
// needs to be protected.
|
||||
updatesLock sync.Mutex
|
||||
updates map[string]types.Condition
|
||||
conditions map[string]types.Condition
|
||||
}
|
||||
|
||||
// NewConditionManager creates a condition manager.
|
||||
@@ -80,8 +84,8 @@ func (c *conditionManager) Start() {
|
||||
}
|
||||
|
||||
func (c *conditionManager) UpdateCondition(condition types.Condition) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
c.updatesLock.Lock()
|
||||
defer c.updatesLock.Unlock()
|
||||
// New node condition will override the old condition, because we only need the newest
|
||||
// condition for each condition type.
|
||||
c.updates[condition.Type] = condition
|
||||
@@ -92,17 +96,17 @@ func (c *conditionManager) syncLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-updateCh:
|
||||
if c.checkUpdates() || c.checkResync() {
|
||||
if c.needUpdates() || c.needResync() || c.needHeartbeat() {
|
||||
c.sync()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// checkUpdates checks whether there are recent updates.
|
||||
func (c *conditionManager) checkUpdates() bool {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
// needUpdates checks whether there are recent updates.
|
||||
func (c *conditionManager) needUpdates() bool {
|
||||
c.updatesLock.Lock()
|
||||
defer c.updatesLock.Unlock()
|
||||
needUpdate := false
|
||||
for t, update := range c.updates {
|
||||
if !reflect.DeepEqual(c.conditions[t], update) {
|
||||
@@ -114,13 +118,21 @@ func (c *conditionManager) checkUpdates() bool {
|
||||
return needUpdate
|
||||
}
|
||||
|
||||
// checkResync checks whether a resync is needed.
|
||||
func (c *conditionManager) checkResync() bool {
|
||||
return c.clock.Now().Sub(c.latest) >= resyncPeriod
|
||||
// needResync checks whether a resync is needed.
|
||||
func (c *conditionManager) needResync() bool {
|
||||
// Only update when resync is needed.
|
||||
return c.clock.Now().Sub(c.latestTry) >= resyncPeriod && c.resyncNeeded
|
||||
}
|
||||
|
||||
// needHeartbeat checks whether a forcible heartbeat is needed.
|
||||
func (c *conditionManager) needHeartbeat() bool {
|
||||
return c.clock.Now().Sub(c.latestTry) >= heartbeatPeriod
|
||||
}
|
||||
|
||||
// sync synchronizes node conditions with the apiserver.
|
||||
func (c *conditionManager) sync() {
|
||||
c.latestTry = c.clock.Now()
|
||||
c.resyncNeeded = false
|
||||
conditions := []api.NodeCondition{}
|
||||
for i := range c.conditions {
|
||||
conditions = append(conditions, problemutil.ConvertToAPICondition(c.conditions[i]))
|
||||
@@ -128,7 +140,7 @@ func (c *conditionManager) sync() {
|
||||
if err := c.client.SetConditions(conditions); err != nil {
|
||||
// The conditions will be updated again in future sync
|
||||
glog.Errorf("failed to update node conditions: %v", err)
|
||||
c.resyncNeeded = true
|
||||
return
|
||||
}
|
||||
c.latest = c.clock.Now()
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package condition
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -47,7 +48,7 @@ func newTestCondition(condition string) types.Condition {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckUpdates(t *testing.T) {
|
||||
func TestNeedUpdates(t *testing.T) {
|
||||
m, _, _ := newTestManager()
|
||||
var c types.Condition
|
||||
for desc, test := range map[string]struct {
|
||||
@@ -75,19 +76,41 @@ func TestCheckUpdates(t *testing.T) {
|
||||
c = newTestCondition(test.condition)
|
||||
}
|
||||
m.UpdateCondition(c)
|
||||
assert.Equal(t, test.update, m.checkUpdates(), desc)
|
||||
assert.Equal(t, test.update, m.needUpdates(), desc)
|
||||
assert.Equal(t, c, m.conditions[c.Type], desc)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSync(t *testing.T) {
|
||||
func TestResync(t *testing.T) {
|
||||
m, fakeClient, fakeClock := newTestManager()
|
||||
condition := newTestCondition("TestCondition")
|
||||
m.conditions = map[string]types.Condition{condition.Type: condition}
|
||||
m.sync()
|
||||
expected := []api.NodeCondition{problemutil.ConvertToAPICondition(condition)}
|
||||
assert.Nil(t, fakeClient.AssertConditions(expected), "Condition should be updated via client")
|
||||
assert.False(t, m.checkResync(), "Should not resync before timeout exceeds")
|
||||
|
||||
assert.False(t, m.needResync(), "Should not resync before resync period")
|
||||
fakeClock.Step(resyncPeriod)
|
||||
assert.True(t, m.checkResync(), "Should resync after timeout exceeds")
|
||||
assert.False(t, m.needResync(), "Should not resync after resync period without resync needed")
|
||||
|
||||
fakeClient.InjectError("SetConditions", fmt.Errorf("injected error"))
|
||||
m.sync()
|
||||
|
||||
assert.False(t, m.needResync(), "Should not resync before resync period")
|
||||
fakeClock.Step(resyncPeriod)
|
||||
assert.True(t, m.needResync(), "Should resync after resync period and resync is needed")
|
||||
}
|
||||
|
||||
func TestHeartbeat(t *testing.T) {
|
||||
m, fakeClient, fakeClock := newTestManager()
|
||||
condition := newTestCondition("TestCondition")
|
||||
m.conditions = map[string]types.Condition{condition.Type: condition}
|
||||
m.sync()
|
||||
expected := []api.NodeCondition{problemutil.ConvertToAPICondition(condition)}
|
||||
assert.Nil(t, fakeClient.AssertConditions(expected), "Condition should be updated via client")
|
||||
|
||||
assert.False(t, m.needHeartbeat(), "Should not heartbeat before heartbeat period")
|
||||
|
||||
fakeClock.Step(heartbeatPeriod)
|
||||
assert.True(t, m.needHeartbeat(), "Should heartbeat after heartbeat period")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user