UNCLASSIFIED

Unverified Commit e874140a authored by Jamie Phillips's avatar Jamie Phillips
Browse files

Refactored how we start calico with kubelet context to cancel



This addresses a few lingering issues that really cleans up our implementation.
Signed-off-by: default avatarJamie Phillips <jamie.phillips@suse.com>
parent d3968327
...@@ -145,15 +145,24 @@ func (p *PEBinaryConfig) Kubelet(args []string) error { ...@@ -145,15 +145,24 @@ func (p *PEBinaryConfig) Kubelet(args []string) error {
logrus.Infof("Running RKE2 kubelet %v", cleanArgs) logrus.Infof("Running RKE2 kubelet %v", cleanArgs)
go func() { go func() {
for { for {
ctx, cancel := context.WithCancel(context.Background())
go func() {
if err := p.cni.Start(ctx, p.cniConig); err != nil {
logrus.Errorf("error in cni start: %s", err)
}
}()
cmd := exec.Command(p.KubeletPath, cleanArgs...) cmd := exec.Command(p.KubeletPath, cleanArgs...)
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
err := cmd.Run() if err := cmd.Run(); err != nil {
logrus.Errorf("Kubelet exited: %v", err) logrus.Errorf("Kubelet exited: %v", err)
}
cancel()
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
}() }()
return p.cni.Start(p.cniConig) return nil
} }
// KubeProxy starts the kubeproxy in a subprocess with watching goroutine. // KubeProxy starts the kubeproxy in a subprocess with watching goroutine.
...@@ -173,13 +182,14 @@ func (p *PEBinaryConfig) KubeProxy(args []string) error { ...@@ -173,13 +182,14 @@ func (p *PEBinaryConfig) KubeProxy(args []string) error {
extraArgs["enable-dsr"] = "true" extraArgs["enable-dsr"] = "true"
} }
var vip string
for range time.Tick(time.Second * 5) { for range time.Tick(time.Second * 5) {
endpoint, err := hcsshim.GetHNSEndpointByName("Calico_ep") endpoint, err := hcsshim.GetHNSEndpointByName("Calico_ep")
if err != nil { if err != nil {
logrus.WithError(err).Warningf("can't find %s, retrying", "Calico_ep") logrus.WithError(err).Warningf("can't find %s, retrying", "Calico_ep")
continue continue
} }
extraArgs["source-vip"] = endpoint.IPAddress.String() vip = endpoint.IPAddress.String()
break break
} }
...@@ -190,6 +200,13 @@ func (p *PEBinaryConfig) KubeProxy(args []string) error { ...@@ -190,6 +200,13 @@ func (p *PEBinaryConfig) KubeProxy(args []string) error {
} }
args = append(getArgs(extraArgs), args...) args = append(getArgs(extraArgs), args...)
for i, arg := range args {
if strings.Contains(arg, "source-vip") {
args[i] = "--source-vip=" + vip
}
}
logrus.Infof("Running RKE2 kube-proxy %s", args) logrus.Infof("Running RKE2 kube-proxy %s", args)
go func() { go func() {
for { for {
......
...@@ -106,24 +106,25 @@ func (c *Calico) Setup(ctx context.Context, dataDir string, nodeConfig *daemonco ...@@ -106,24 +106,25 @@ func (c *Calico) Setup(ctx context.Context, dataDir string, nodeConfig *daemonco
if err := createCNIConfig(cfg); err != nil { if err := createCNIConfig(cfg); err != nil {
return nil, err return nil, err
} }
logrus.Info("Generating HNS networks, please wait")
if err := generateCalicoNetworks(cfg.CalicoConfig.NetworkingBackend); err != nil {
return nil, err
}
return cfg, nil return cfg, nil
} }
// Start starts the CNI services on the Windows node. // Start starts the CNI services on the Windows node.
func (c *Calico) Start(config *CNIConfig) error { func (c *Calico) Start(ctx context.Context, config *CNIConfig) error {
if err := generateCalicoNetworks(config.CalicoConfig.NetworkingBackend); err != nil { for {
return err if err := startCalico(ctx, config.CalicoConfig); err != nil {
} continue
}
if err := startCalico(config.CalicoConfig); err != nil { break
return err
} }
go startFelix(ctx, config.CalicoConfig)
time.Sleep(5 * time.Second)
if err := startFelix(config.CalicoConfig); err != nil {
return err
}
return nil return nil
} }
...@@ -190,7 +191,7 @@ func getDefaultConfig(config *CNIConfig, dataDir string, nodeConfig *config.Node ...@@ -190,7 +191,7 @@ func getDefaultConfig(config *CNIConfig, dataDir string, nodeConfig *config.Node
return nil return nil
} }
func startFelix(config *CalicoConfig) error { func startFelix(ctx context.Context, config *CalicoConfig) {
specificEnvs := []string{ specificEnvs := []string{
fmt.Sprintf("FELIX_FELIXHOSTNAME=%s", config.Hostname), fmt.Sprintf("FELIX_FELIXHOSTNAME=%s", config.Hostname),
fmt.Sprintf("FELIX_VXLANVNI=%s", config.Felix.Vxlanvni), fmt.Sprintf("FELIX_VXLANVNI=%s", config.Felix.Vxlanvni),
...@@ -202,21 +203,16 @@ func startFelix(config *CalicoConfig) error { ...@@ -202,21 +203,16 @@ func startFelix(config *CalicoConfig) error {
} }
logrus.Infof("Felix Envs: ", append(generateGeneralCalicoEnvs(config), specificEnvs...)) logrus.Infof("Felix Envs: ", append(generateGeneralCalicoEnvs(config), specificEnvs...))
go func() { cmd := exec.CommandContext(ctx, "calico-node.exe", args...)
for { cmd.Env = append(generateGeneralCalicoEnvs(config), specificEnvs...)
cmd := exec.Command("calico-node.exe", args...) cmd.Stdout = os.Stdout
cmd.Env = append(generateGeneralCalicoEnvs(config), specificEnvs...) cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout if err := cmd.Run(); err != nil {
cmd.Stderr = os.Stderr logrus.Errorf("Felix exited: %v", err)
err := cmd.Run() }
logrus.Errorf("Felix exited: %v", err)
time.Sleep(5 * time.Second)
}
}()
return nil
} }
func startCalico(config *CalicoConfig) error { func startCalico(ctx context.Context, config *CalicoConfig) error {
specificEnvs := []string{ specificEnvs := []string{
fmt.Sprintf("CALICO_NODENAME_FILE=%s", config.NodeNameFile), fmt.Sprintf("CALICO_NODENAME_FILE=%s", config.NodeNameFile),
} }
...@@ -225,17 +221,14 @@ func startCalico(config *CalicoConfig) error { ...@@ -225,17 +221,14 @@ func startCalico(config *CalicoConfig) error {
"-startup", "-startup",
} }
logrus.Infof("Calico Envs: ", append(generateGeneralCalicoEnvs(config), specificEnvs...)) logrus.Infof("Calico Envs: ", append(generateGeneralCalicoEnvs(config), specificEnvs...))
go func() { cmd := exec.CommandContext(ctx, "calico-node.exe", args...)
for { cmd.Env = append(generateGeneralCalicoEnvs(config), specificEnvs...)
cmd := exec.Command("calico-node.exe", args...) cmd.Stdout = os.Stdout
cmd.Env = append(generateGeneralCalicoEnvs(config), specificEnvs...) cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout if err := cmd.Run(); err != nil {
cmd.Stderr = os.Stderr logrus.Errorf("Calico exited: %v", err)
err := cmd.Run() return err
logrus.Errorf("Calico exited: %v", err) }
time.Sleep(5 * time.Second)
}
}()
return nil return nil
} }
...@@ -309,7 +302,6 @@ func generateCalicoNetworks(backend string) error { ...@@ -309,7 +302,6 @@ func generateCalicoNetworks(backend string) error {
logrus.Debug("Waiting for management ip..") logrus.Debug("Waiting for management ip..")
mgmt := waitForManagementIP(CalicoHnsNetworkName) mgmt := waitForManagementIP(CalicoHnsNetworkName)
time.Sleep(90 * time.Second)
platform, err := getPlatformType() platform, err := getPlatformType()
if err != nil { if err != nil {
return err return err
......
...@@ -11,8 +11,8 @@ import ( ...@@ -11,8 +11,8 @@ import (
) )
type CNI interface { type CNI interface {
Setup(ctx context.Context, dataDir string, nodeConfig *daemonconfig.Node, restConfig *rest.Config) (*CNIConfig, error) Setup(context.Context, string, *daemonconfig.Node, *rest.Config) (*CNIConfig, error)
Start(config *CNIConfig) error Start(context.Context, *CNIConfig) error
} }
type CNIConfig struct { type CNIConfig struct {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment