Tại sao chúng ta không nên chi nhỏ funtion.
package main
import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"strings"
"time"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type DrainOptions struct {
graceSeconds int64
timeout time.Duration
ignoreNamespaces map[string]struct{}
forceDelete bool // delete instead of evict if eviction is blocked and timeout is nearing
evictionRetryDelay time.Duration
}
func main() {
var (
nodeName string
kubeconfig string
grace int
timeout time.Duration
ignoreNS string
force bool
retryInterval time.Duration
)
flag.StringVar(&nodeName, "node", "", "node name to cordon and drain (required)")
flag.StringVar(&kubeconfig, "kubeconfig", "", "path to kubeconfig (defaults: in-cluster, then ~/.kube/config)")
flag.IntVar(&grace, "grace-seconds", 60, "pod termination grace period seconds")
flag.DurationVar(&timeout, "timeout", 20*time.Minute, "overall drain timeout")
flag.StringVar(&ignoreNS, "ignore-ns", "kube-system,kube-node-lease", "comma-separated namespaces to leave untouched")
flag.BoolVar(&force, "force", false, "if true, delete pods when eviction is blocked close to timeout")
flag.DurationVar(&retryInterval, "interval", 5*time.Second, "poll interval while waiting for drain")
flag.Parse()
if nodeName == "" {
fmt.Fprintln(os.Stderr, "error: -node is required")
os.Exit(2)
}
ctx := context.Background()
client, err := kubeClient(kubeconfig)
if err != nil {
fatal(err)
}
if err := cordonNode(ctx, client, nodeName); err != nil {
fatal(fmt.Errorf("cordon: %w", err))
}
opts := DrainOptions{
graceSeconds: int64(grace),
timeout: timeout,
ignoreNamespaces: toSet(ignoreNS),
forceDelete: force,
evictionRetryDelay: retryInterval,
}
if err := drainNode(ctx, client, nodeName, opts); err != nil {
fatal(fmt.Errorf("drain: %w", err))
}
fmt.Printf("Node %q cordoned and drained successfully\n", nodeName)
}
func kubeClient(kubeconfig string) (*kubernetes.Clientset, error) {
var cfg *rest.Config
var err error
if kubeconfig != "" {
cfg, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
} else {
// Try in-cluster first
cfg, err = rest.InClusterConfig()
if err != nil {
home, _ := os.UserHomeDir()
kcfg := filepath.Join(home, ".kube", "config")
cfg, err = clientcmd.BuildConfigFromFlags("", kcfg)
}
}
if err != nil {
return nil, err
}
return kubernetes.NewForConfig(cfg)
}
func cordonNode(ctx context.Context, c *kubernetes.Clientset, nodeName string) error {
node, err := c.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return err
}
if node.Spec.Unschedulable {
return nil
}
node.Spec.Unschedulable = true
_, err = c.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{})
return err
}
func drainNode(ctx context.Context, c *kubernetes.Clientset, nodeName string, o DrainOptions) error {
// Evict eligible pods
if err := evictPodsOnNode(ctx, c, nodeName, o); err != nil {
return err
}
// Wait until no evictable pods remain
return wait.PollUntilContextTimeout(ctx, o.evictionRetryDelay, o.timeout, true, func(ctx context.Context) (done bool, err error) {
pods, err := listPodsOnNode(ctx, c, nodeName)
if err != nil {
return false, err
}
evictable := filterEvictablePods(pods, o)
return len(evictable) == 0, nil
})
}
func evictPodsOnNode(ctx context.Context, c *kubernetes.Clientset, nodeName string, o DrainOptions) error {
pods, err := listPodsOnNode(ctx, c, nodeName)
if err != nil {
return err
}
evictable := filterEvictablePods(pods, o)
for _, p := range evictable {
// Try eviction (respects PDBs)
ev := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p.Name,
Namespace: p.Namespace,
},
DeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &o.graceSeconds},
}
_, err := c.PolicyV1().Evictions(p.Namespace).Create(ctx, ev, metav1.CreateOptions{})
if err == nil || apierrors.IsNotFound(err) {
continue
}
// If PDB blocks and force is enabled, fall back to delete near timeout windows
if apierrors.IsTooManyRequests(err) && o.forceDelete {
_ = c.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{GracePeriodSeconds: &o.graceSeconds})
continue
}
return fmt.Errorf("evict %s/%s: %w", p.Namespace, p.Name, err)
}
return nil
}
func listPodsOnNode(ctx context.Context, c *kubernetes.Clientset, nodeName string) (*[]PodLite, error) {
podList, err := c.CoreV1().Pods("").List(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
})
if err != nil {
return nil, err
}
out := make([]PodLite, 0, len(podList.Items))
for _, p := range podList.Items {
out = append(out, PodLite{
Namespace: p.Namespace,
Name: p.Name,
OwnerKinds: ownerKinds(&p),
Annotations: p.Annotations,
})
}
return &out, nil
}
type PodLite struct {
Namespace string
Name string
OwnerKinds []string
Annotations map[string]string
}
func filterEvictablePods(pods *[]PodLite, o DrainOptions) []PodLite {
var res []PodLite
for _, p := range *pods {
if _, skip := o.ignoreNamespaces[p.Namespace]; skip {
continue
}
// Skip mirror/static pods
if _, isMirror := p.Annotations["kubernetes.io/config.mirror"]; isMirror {
continue
}
// Skip DaemonSet-managed pods (they will be killed by kubelet after drain)
if hasOwnerKind(p.OwnerKinds, "DaemonSet") {
continue
}
res = append(res, p)
}
return res
}
func ownerKinds(pod *policyv1.Eviction) []string { return nil } // placeholder to satisfy compiler
// Real ownerKinds implementation:
func ownerKindsFromMetaOwners(owners []metav1.OwnerReference) []string {
kinds := make([]string, 0, len(owners))
for _, o := range owners {
kinds = append(kinds, o.Kind)
}
return kinds
}
func ownerKinds(p *PodLite) []string { return p.OwnerKinds } // kept for clarity
func ownerKinds(p *PodLite) []string { return p.OwnerKinds } // duplicate guard
func hasOwnerKind(kinds []string, kind string) bool {
for _, k := range kinds {
if k == kind {
return true
}
}
return false
}
func toSet(csv string) map[string]struct{} {
s := map[string]struct{}{}
for _, ns := range strings.Split(csv, ",") {
ns = strings.TrimSpace(ns)
if ns != "" {
s[ns] = struct{}{}
}
}
return s
}
func fatal(err error) {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
Code này dùng anonymous function:
func drainNode(ctx context.Context, c *kubernetes.Clientset, nodeName string, o DrainOptions) error {
// Inline helpers:
// - list pods on node
listPods := func(ctx context.Context) ([]corev1.Pod, error) {
podList, err := c.CoreV1().Pods("").List(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
})
if err != nil {
return nil, err
}
return podList.Items, nil
}
// - filter evictable pods (skip DaemonSet, mirror/static, ignored namespaces)
filterEvictable := func(pods []corev1.Pod) []corev1.Pod {
out := make([]corev1.Pod, 0, len(pods))
for _, p := range pods {
if _, skip := o.ignoreNamespaces[p.Namespace]; skip {
continue
}
if _, isMirror := p.Annotations["kubernetes.io/config.mirror"]; isMirror {
continue
}
isDS := false
for _, ow := range p.OwnerReferences {
if ow.Kind == "DaemonSet" {
isDS = true
break
}
}
if isDS {
continue
}
out = append(out, p)
}
return out
}
// - try evict, optionally force delete if close to timeout and PDB blocks
tryEvict := func(ctx context.Context, p corev1.Pod, nearDeadline bool) error {
ev := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p.Name,
Namespace: p.Namespace,
},
DeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &o.graceSeconds},
}
_, err := c.PolicyV1().Evictions(p.Namespace).Create(ctx, ev, metav1.CreateOptions{})
if err == nil || apierrors.IsNotFound(err) {
return nil
}
if apierrors.IsTooManyRequests(err) && o.forceDelete && nearDeadline {
// PDB blocked; force delete near deadline
return c.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{
GracePeriodSeconds: &o.graceSeconds,
})
}
return fmt.Errorf("evict %s/%s: %w", p.Namespace, p.Name, err)
}
// Drain loop
deadline := time.Now().Add(o.timeout)
ticker := time.NewTicker(o.evictionRetryDelay)
defer ticker.Stop()
for {
// Timeout?
if time.Now().After(deadline) {
return fmt.Errorf("timeout waiting for node %q to drain", nodeName)
}
// 1) List and filter
pods, err := listPods(ctx)
if err != nil {
return err
}
evictable := filterEvictable(pods)
if len(evictable) == 0 {
return nil // drained
}
// 2) Evict/delete candidates
nearDeadline := time.Until(deadline) <= 2*o.evictionRetryDelay
for _, p := range evictable {
if err := tryEvict(ctx, p, nearDeadline); err != nil {
return err
}
}
// 3) Wait before next pass
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}
simple hơn
func drainNode(ctx context.Context, c *kubernetes.Clientset, nodeName string, o DrainOptions) error {
deadline := time.Now().Add(o.timeout)
ticker := time.NewTicker(o.evictionRetryDelay)
defer ticker.Stop()
for {
if time.Now().After(deadline) {
return fmt.Errorf("timeout waiting for node %q to drain", nodeName)
}
// 1) List pods on the node
podList, err := c.CoreV1().Pods("").List(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("spec.nodeName", nodeName).String(),
})
if err != nil {
return err
}
// 2) Filter evictable pods (skip ignored namespaces, mirror/static, DaemonSet)
evictable := make([]corev1.Pod, 0, len(podList.Items))
for _, p := range podList.Items {
if _, skip := o.ignoreNamespaces[p.Namespace]; skip {
continue
}
if _, isMirror := p.Annotations["kubernetes.io/config.mirror"]; isMirror {
continue
}
isDaemonSet := false
for _, ow := range p.OwnerReferences {
if ow.Kind == "DaemonSet" {
isDaemonSet = true
break
}
}
if isDaemonSet {
continue
}
evictable = append(evictable, p)
}
// 3) If nothing to evict, we're done
if len(evictable) == 0 {
return nil
}
// 4) Evict (or force delete near deadline if PDB blocks and force enabled)
nearDeadline := time.Until(deadline) <= 2*o.evictionRetryDelay
for _, p := range evictable {
ev := &policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: p.Name,
Namespace: p.Namespace,
},
DeleteOptions: &metav1.DeleteOptions{GracePeriodSeconds: &o.graceSeconds},
}
_, err := c.PolicyV1().Evictions(p.Namespace).Create(ctx, ev, metav1.CreateOptions{})
if err == nil || apierrors.IsNotFound(err) {
continue
}
if apierrors.IsTooManyRequests(err) && o.forceDelete && nearDeadline {
derr := c.CoreV1().Pods(p.Namespace).Delete(ctx, p.Name, metav1.DeleteOptions{
GracePeriodSeconds: &o.graceSeconds,
})
if derr != nil && !apierrors.IsNotFound(derr) {
return fmt.Errorf("delete %s/%s: %w", p.Namespace, p.Name, derr)
}
continue
}
return fmt.Errorf("evict %s/%s: %w", p.Namespace, p.Name, err)
}
// 5) Wait for next pass
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
}
}