diff --git a/Makefile b/Makefile index 8e3bbdc9..91da3f79 100644 --- a/Makefile +++ b/Makefile @@ -89,8 +89,8 @@ container: ############################ build local .PHONY: container-local container-local: kubevpn-linux-amd64 - docker buildx build --platform linux/amd64 -t ${IMAGE} -f $(BUILD_DIR)/local.Dockerfile . + docker buildx build --platform linux/amd64 -t docker.io/naison/kubevpn:latest -f $(BUILD_DIR)/local.Dockerfile . .PHONY: container-test container-test: kubevpn-linux-amd64 - docker buildx build --platform linux/amd64 -t ${IMAGE} -f $(BUILD_DIR)/test.Dockerfile --push . \ No newline at end of file + docker buildx build --platform linux/amd64 -t docker.io/naison/kubevpn:test -f $(BUILD_DIR)/test.Dockerfile --push . \ No newline at end of file diff --git a/cmd/kubevpn/cmds/duplicate.go b/cmd/kubevpn/cmds/duplicate.go new file mode 100644 index 00000000..3325547d --- /dev/null +++ b/cmd/kubevpn/cmds/duplicate.go @@ -0,0 +1,131 @@ +package cmds + +import ( + "context" + "fmt" + "io" + defaultlog "log" + "net/http" + "os" + "syscall" + + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + cmdutil "k8s.io/kubectl/pkg/cmd/util" + utilcomp "k8s.io/kubectl/pkg/util/completion" + "k8s.io/kubectl/pkg/util/i18n" + "k8s.io/kubectl/pkg/util/templates" + + "github.com/wencaiwulue/kubevpn/pkg/config" + "github.com/wencaiwulue/kubevpn/pkg/handler" + "github.com/wencaiwulue/kubevpn/pkg/util" +) + +// CmdDuplicate multiple cluster operate, can start up one deployment to another cluster +// kubectl exec POD_NAME -c CONTAINER_NAME /sbin/killall5 or ephemeralcontainers +func CmdDuplicate(f cmdutil.Factory) *cobra.Command { + var duplicateOptions = handler.DuplicateOptions{} + var sshConf = util.SshConfig{} + cmd := &cobra.Command{ + Use: "duplicate", + Short: i18n.T("Connect to kubernetes cluster network, or proxy kubernetes workloads inbound traffic into local PC"), + Long: templates.LongDesc(i18n.T(`Connect to kubernetes cluster network, or proxy kubernetes workloads inbound traffic into local PC`)), + Example: templates.Examples(i18n.T(` + # duplicate + - duplicate deployment + kubevpn duplicate deployment/productpage + + - duplicate service + kubevpn proxy service/productpage + + - duplicate multiple workloads + kubevpn duplicate deployment/authors deployment/productpage + or + kubevpn duplicate deployment authors productpage + + # Reverse duplicate with mesh, traffic with header a=1, will hit local PC, otherwise no effect + kubevpn duplicate service/productpage --headers a=1 + + # Connect to api-server behind of bastion host or ssh jump host and proxy kubernetes resource traffic into local PC + kubevpn duplicate --ssh-addr 192.168.1.100:22 --ssh-username root --ssh-keyfile /Users/naison/.ssh/ssh.pem service/productpage --headers a=1 + + # it also support ProxyJump, like + ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌────────────┐ + │ pc ├────►│ ssh1 ├────►│ ssh2 ├────►│ ssh3 ├─────►... ─────► │ api-server │ + └──────┘ └──────┘ └──────┘ └──────┘ └────────────┘ + kubevpn duplicate service/productpage --ssh-alias --headers a=1 + +`)), + PreRunE: func(cmd *cobra.Command, args []string) (err error) { + if !util.IsAdmin() { + util.RunWithElevated() + os.Exit(0) + } + go http.ListenAndServe("localhost:6060", nil) + util.InitLogger(config.Debug) + defaultlog.Default().SetOutput(io.Discard) + return handler.SshJump(sshConf, cmd.Flags()) + }, + RunE: func(cmd *cobra.Command, args []string) error { + if len(args) == 0 { + _, _ = fmt.Fprintf(os.Stdout, "You must specify the type of resource to proxy. %s\n\n", cmdutil.SuggestAPIResources("kubevpn")) + fullCmdName := cmd.Parent().CommandPath() + usageString := "Required resource not specified." + if len(fullCmdName) > 0 && cmdutil.IsSiblingCommandExists(cmd, "explain") { + usageString = fmt.Sprintf("%s\nUse \"%s explain \" for a detailed description of that resource (e.g. %[2]s explain pods).", usageString, fullCmdName) + } + return cmdutil.UsageErrorf(cmd, usageString) + } + + connectOptions := handler.ConnectOptions{ + Namespace: duplicateOptions.Namespace, + Workloads: args, + } + if err := connectOptions.InitClient(f); err != nil { + return err + } + err := connectOptions.PreCheckResource() + if err != nil { + return err + } + duplicateOptions.Workloads = connectOptions.Workloads + connectOptions.Workloads = []string{} + if err = connectOptions.DoConnect(); err != nil { + log.Errorln(err) + handler.Cleanup(syscall.SIGQUIT) + } else { + err = duplicateOptions.InitClient(f) + if err != nil { + return err + } + err = duplicateOptions.DoDuplicate(context.Background()) + if err != nil { + return err + } + fmt.Println() + fmt.Println(`----------------------------------------------------------------------------------`) + fmt.Println(` Now duplicate workloads running successfully on other cluster, enjoy it :) `) + fmt.Println(`----------------------------------------------------------------------------------`) + fmt.Println() + } + select {} + }, + } + cmd.Flags().StringToStringVarP(&duplicateOptions.Headers, "headers", "H", map[string]string{}, "Traffic with special headers with reverse it to duplicate workloads, you should startup your service after reverse workloads successfully, If not special, redirect all traffic to duplicate workloads, format is k=v, like: k1=v1,k2=v2") + cmd.Flags().BoolVar(&config.Debug, "debug", false, "Enable debug mode or not, true or false") + cmd.Flags().StringVar(&config.Image, "image", config.Image, "Use this image to startup container") + cmd.Flags().StringVar(&duplicateOptions.TargetImage, "target-image", "", "Duplicate container use this image to startup container, if not special, use origin origin image") + cmd.Flags().StringVar(&duplicateOptions.TargetContainer, "target-container", "", "Duplicate container use special image to startup this container, if not special, use origin origin image") + cmd.Flags().StringVar(&duplicateOptions.TargetNamespace, "target-namespace", "", "Duplicate workloads in this namespace, if not special, use origin namespace") + cmd.Flags().StringVar(&duplicateOptions.TargetKubeconfig, "target-kubeconfig", "", "Duplicate workloads will create in this cluster, if not special, use origin cluster") + + // for ssh jumper host + cmd.Flags().StringVar(&sshConf.Addr, "ssh-addr", "", "Optional ssh jump server address to dial as :, eg: 127.0.0.1:22") + cmd.Flags().StringVar(&sshConf.User, "ssh-username", "", "Optional username for ssh jump server") + cmd.Flags().StringVar(&sshConf.Password, "ssh-password", "", "Optional password for ssh jump server") + cmd.Flags().StringVar(&sshConf.Keyfile, "ssh-keyfile", "", "Optional file with private key for SSH authentication") + cmd.Flags().StringVar(&sshConf.ConfigAlias, "ssh-alias", "", "Optional config alias with ~/.ssh/config for SSH authentication") + + cmd.ValidArgsFunction = utilcomp.ResourceTypeAndNameCompletionFunc(f) + return cmd +} diff --git a/cmd/kubevpn/cmds/root.go b/cmd/kubevpn/cmds/root.go index f39b73fd..33a0ccb1 100644 --- a/cmd/kubevpn/cmds/root.go +++ b/cmd/kubevpn/cmds/root.go @@ -31,6 +31,7 @@ func NewKubeVPNCommand() *cobra.Command { CmdConnect(factory), CmdProxy(factory), CmdDev(factory), + CmdDuplicate(factory), CmdReset(factory), CmdUpgrade(factory), CmdVersion(factory), diff --git a/pkg/config/config.go b/pkg/config/config.go index bb778ecd..09311225 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -4,6 +4,8 @@ import ( "net" "sync" "time" + + "sigs.k8s.io/kustomize/api/konfig" ) const ( @@ -51,6 +53,11 @@ const ( // api APIRentIP = "/rent/ip" APIReleaseIP = "/release/ip" + + KUBECONFIG = "kubeconfig" + + // labels + ManageBy = konfig.ManagedbyLabelKey ) var ( diff --git a/pkg/handler/duplicate.go b/pkg/handler/duplicate.go new file mode 100644 index 00000000..6aa005dc --- /dev/null +++ b/pkg/handler/duplicate.go @@ -0,0 +1,602 @@ +package handler + +import ( + "context" + "encoding/json" + "fmt" + "github.com/wencaiwulue/kubevpn/pkg/dev" + "github.com/wencaiwulue/kubevpn/pkg/mesh" + "path/filepath" + "sort" + "strings" + "time" + + "github.com/google/uuid" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/cli-runtime/pkg/genericclioptions" + runtimeresource "k8s.io/cli-runtime/pkg/resource" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/clientcmd/api" + "k8s.io/client-go/tools/clientcmd/api/latest" + clientcmdlatest "k8s.io/client-go/tools/clientcmd/api/latest" + "k8s.io/client-go/util/retry" + cmdutil "k8s.io/kubectl/pkg/cmd/util" + "k8s.io/kubectl/pkg/polymorphichelpers" + "k8s.io/kubectl/pkg/util/podutils" + "k8s.io/utils/pointer" + + "github.com/wencaiwulue/kubevpn/pkg/config" + "github.com/wencaiwulue/kubevpn/pkg/util" +) + +type DuplicateOptions struct { + Namespace string + Headers map[string]string + Workloads []string + + TargetKubeconfig string + TargetNamespace string + TargetContainer string + TargetImage string + + isSame bool + + targetClientset *kubernetes.Clientset + targetRestclient *rest.RESTClient + targetConfig *rest.Config + targetFactory cmdutil.Factory + + clientset *kubernetes.Clientset + restclient *rest.RESTClient + config *rest.Config + factory cmdutil.Factory +} + +func (d *DuplicateOptions) InitClient(f cmdutil.Factory) (err error) { + d.factory = f + if d.config, err = d.factory.ToRESTConfig(); err != nil { + return + } + if d.restclient, err = d.factory.RESTClient(); err != nil { + return + } + if d.clientset, err = d.factory.KubernetesClientSet(); err != nil { + return + } + if d.Namespace, _, err = d.factory.ToRawKubeConfigLoader().Namespace(); err != nil { + return + } + + // init target info + if len(d.TargetKubeconfig) == 0 { + d.targetFactory = d.factory + d.targetClientset = d.clientset + d.targetConfig = d.config + d.targetRestclient = d.restclient + if len(d.TargetNamespace) == 0 { + d.TargetNamespace = d.Namespace + d.isSame = true + } + return + } + configFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag() + configFlags.KubeConfig = pointer.String(d.TargetKubeconfig) + configFlags.Namespace = pointer.String(d.TargetNamespace) + matchVersionFlags := cmdutil.NewMatchVersionFlags(configFlags) + d.targetFactory = cmdutil.NewFactory(matchVersionFlags) + loader := d.targetFactory.ToRawKubeConfigLoader() + var found bool + d.TargetNamespace, found, err = loader.Namespace() + if err != nil || !found { + d.TargetNamespace = d.Namespace + } + d.targetClientset, err = d.targetFactory.KubernetesClientSet() + return +} + +// DoDuplicate +/* +* 1) download mount path use empty-dir but skip empty-dir in init-containers +* 2) get env from containers +* 3) create serviceAccount as needed +* 4) modify podTempSpec inject kubevpn container + */ +func (d *DuplicateOptions) DoDuplicate(ctx context.Context) error { + rawConfig, err := d.targetFactory.ToRawKubeConfigLoader().RawConfig() + if err != nil { + return err + } + err = api.FlattenConfig(&rawConfig) + if err != nil { + return err + } + rawConfig.SetGroupVersionKind(schema.GroupVersionKind{Version: clientcmdlatest.Version, Kind: "Config"}) + var convertedObj runtime.Object + convertedObj, err = latest.Scheme.ConvertToVersion(&rawConfig, latest.ExternalVersion) + if err != nil { + return err + } + var kubeconfigJsonBytes []byte + kubeconfigJsonBytes, err = json.Marshal(convertedObj) + if err != nil { + return err + } + + for _, workload := range d.Workloads { + var object *runtimeresource.Info + object, err = util.GetUnstructuredObject(d.factory, d.Namespace, workload) + if err != nil { + return err + } + u := object.Object.(*unstructured.Unstructured) + u.SetNamespace(d.TargetNamespace) + RemoveUselessInfo(u) + var newUUID uuid.UUID + newUUID, err = uuid.NewUUID() + if err != nil { + return err + } + u.SetName(fmt.Sprintf("%s-dup-%s", u.GetName(), newUUID.String()[:5])) + + err = d.CreateNecessaryResource(u) + if err != nil { + return err + } + + labelsMap := map[string]string{ + config.ManageBy: config.ConfigMapPodTrafficManager, + "owner-ref": u.GetName(), + } + var path []string + _, path, err = util.GetPodTemplateSpecPath(u) + if err != nil { + return err + } + + err = unstructured.SetNestedStringMap(u.Object, labelsMap, "spec", "selector", "matchLabels") + if err != nil { + return err + } + var client dynamic.Interface + client, err = d.targetFactory.DynamicClient() + if err != nil { + return err + } + RollbackFuncList = append(RollbackFuncList, func() { + _ = client.Resource(object.Mapping.Resource).Namespace(d.TargetNamespace).Delete(context.Background(), u.GetName(), metav1.DeleteOptions{}) + }) + retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + var volumesPath = append(path, "spec", "volumes") + var containersPath = append(path, "spec", "containers") + var annotationPath = append(path, "metadata", "annotations") + var labelsPath = append(path, "metadata", "labels") + + // (1) add annotation KUBECONFIG + stringMap, found, err := unstructured.NestedStringMap(u.Object, annotationPath...) + if err != nil { + return err + } + if !found { + stringMap = map[string]string{} + } + stringMap[config.KUBECONFIG] = string(kubeconfigJsonBytes) + if err = unstructured.SetNestedStringMap(u.Object, stringMap, annotationPath...); err != nil { + return err + } + + // (2) modify labels + if err = unstructured.SetNestedStringMap(u.Object, labelsMap, labelsPath...); err != nil { + return err + } + + // (3) add volumes KUBECONFIG + volumes, found, err := unstructured.NestedSlice(u.Object, volumesPath...) + if err != nil { + return err + } + if !found { + volumes = []interface{}{} + } + volume := &v1.Volume{ + Name: config.KUBECONFIG, + VolumeSource: v1.VolumeSource{ + DownwardAPI: &v1.DownwardAPIVolumeSource{ + Items: []v1.DownwardAPIVolumeFile{{ + Path: config.KUBECONFIG, + FieldRef: &v1.ObjectFieldSelector{ + FieldPath: fmt.Sprintf("metadata.annotations['%s']", config.KUBECONFIG), + }, + }}, + }, + }, + } + marshal, err := json.Marshal(volume) + v := unstructured.Unstructured{} + err = v.UnmarshalJSON(marshal) + if err = unstructured.SetNestedSlice(u.Object, append(volumes, v.Object), volumesPath...); err != nil { + return err + } + + // (4) add kubevpn containers + containers, found, err := unstructured.NestedSlice(u.Object, containersPath...) + if err != nil || !found || containers == nil { + return fmt.Errorf("deployment containers not found or error in spec: %v", err) + } + if d.TargetImage != "" { + var index = -1 + if d.TargetContainer != "" { + for i, container := range containers { + nestedString, _, err := unstructured.NestedString(container.(map[string]interface{}), "name") + if err == nil && nestedString == d.TargetContainer { + index = i + break + } + } + } else { + index = 0 + } + if index < 0 { + return fmt.Errorf("can not found container %s in pod template", d.TargetContainer) + } + // update container[index] image + if err = unstructured.SetNestedField(containers[index].(map[string]interface{}), d.TargetImage, "image"); err != nil { + return err + } + } + container := &v1.Container{ + Name: config.ContainerSidecarVPN, + Image: config.Image, + Command: []string{ + "kubevpn", + "proxy", + workload, + "--kubeconfig", "/tmp/.kube/" + config.KUBECONFIG, + "--namespace", d.Namespace, + "--headers", labels.Set(d.Headers).String(), + "--image", config.Image, + }, + Args: nil, + Resources: v1.ResourceRequirements{ + Requests: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("1024Mi"), + }, + Limits: map[v1.ResourceName]resource.Quantity{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("2048Mi"), + }, + }, + VolumeMounts: []v1.VolumeMount{{ + Name: config.KUBECONFIG, + ReadOnly: false, + MountPath: "/tmp/.kube", + }}, + ImagePullPolicy: v1.PullIfNotPresent, + SecurityContext: &v1.SecurityContext{ + Capabilities: &v1.Capabilities{ + Add: []v1.Capability{ + "NET_ADMIN", + }, + }, + RunAsUser: pointer.Int64(0), + Privileged: pointer.Bool(true), + }, + } + marshal, err = json.Marshal(container) + v = unstructured.Unstructured{} + err = v.UnmarshalJSON(marshal) + if err = unstructured.SetNestedField(u.Object, append(containers, v.Object), containersPath...); err != nil { + return err + } + + _, createErr := runtimeresource.NewHelper(object.Client, object.Mapping).Create(d.TargetNamespace, true, u) + return createErr + }) + if retryErr != nil { + return fmt.Errorf("create duplidate for resource %s failed: %v", workload, retryErr) + } + err = util.WaitPodToBeReady(ctx, d.targetClientset.CoreV1().Pods(d.TargetNamespace), metav1.LabelSelector{MatchLabels: labelsMap}) + if err != nil { + return err + } + _ = util.RolloutStatus(ctx, d.factory, d.Namespace, workload, time.Minute*60) + } + return nil +} + +func RemoveUselessInfo(u *unstructured.Unstructured) { + if u == nil { + return + } + + delete(u.Object, "status") + _ = unstructured.SetNestedField(u.Object, nil, "status") + + u.SetManagedFields(nil) + u.SetResourceVersion("") + u.SetCreationTimestamp(metav1.NewTime(time.Time{})) + u.SetUID("") + u.SetGeneration(0) + a := u.GetAnnotations() + if len(a) == 0 { + a = map[string]string{} + } + delete(a, "kubectl.kubernetes.io/last-applied-configuration") + u.SetAnnotations(a) +} + +// CreateNecessaryResource +/* +1) calculate volume content, and download it into emptyDir +*/ +func (d *DuplicateOptions) CreateNecessaryResource(u *unstructured.Unstructured) error { + + const TokenVolumeMountPath = "/var/run/secrets/kubernetes.io/serviceaccount" + + type VolumeMountContainerPair struct { + container v1.Container + volumeMount v1.VolumeMount + } + temp, path, err := util.GetPodTemplateSpecPath(u) + if err != nil { + return err + } + + sortBy := func(pods []*v1.Pod) sort.Interface { + for i := 0; i < len(pods); i++ { + if pods[i].DeletionTimestamp != nil { + pods = append(pods[:i], pods[i+1:]...) + i-- + } + } + return sort.Reverse(podutils.ActivePods(pods)) + } + lab := labels.SelectorFromSet(temp.Labels).String() + pod, _, err := polymorphichelpers.GetFirstPod(d.clientset.CoreV1(), d.Namespace, lab, time.Second*60, sortBy) + if err != nil { + return err + } + + // remove serviceAccount info + temp.Spec.ServiceAccountName = "" + temp.Spec.AutomountServiceAccountToken = pointer.Bool(false) + + var volumeMap = make(map[string]v1.Volume) + var volumeList []v1.Volume + // pod's volume maybe more than spec defined + for _, volume := range pod.Spec.Volumes { + volumeMap[volume.Name] = volume + + // keep volume emptyDir + if volume.EmptyDir != nil { + volumeList = append(volumeList, volume) + } else { + volumeList = append(volumeList, v1.Volume{ + Name: volume.Name, + VolumeSource: v1.VolumeSource{ + EmptyDir: &v1.EmptyDirVolumeSource{}, + }, + }) + } + } + + var tokenVolume string + var volumeM = make(map[string][]VolumeMountContainerPair) + for _, container := range pod.Spec.Containers { + // group by volume name, what we want is figure out what's contains in every volume + // we need to restore a total volume base on mountPath and subPath + for _, volumeMount := range container.VolumeMounts { + if volumeMap[volumeMount.Name].EmptyDir != nil { + continue + } + if volumeMount.MountPath == TokenVolumeMountPath { + tokenVolume = volumeMount.Name + } + mounts := volumeM[volumeMount.Name] + if mounts == nil { + volumeM[volumeMount.Name] = []VolumeMountContainerPair{} + } + volumeM[volumeMount.Name] = append(volumeM[volumeMount.Name], VolumeMountContainerPair{ + container: container, + volumeMount: volumeMount, + }) + } + } + + var initContainer []v1.Container + for _, volume := range pod.Spec.Volumes { + mountPoint := "/tmp/" + volume.Name + var args []string + for _, pair := range volumeM[volume.Name] { + remote := filepath.Join(pair.volumeMount.MountPath, pair.volumeMount.SubPath) + local := filepath.Join(mountPoint, pair.volumeMount.SubPath) + // kubectl cp /:/tmp/foo /tmp/bar + args = append(args, + fmt.Sprintf("kubevpn cp %s/%s:%s %s -c %s", pod.Namespace, pod.Name, remote, local, pair.container.Name), + ) + } + // means maybe volume only used in initContainers + if len(args) == 0 { + for i := 0; i < len(temp.Spec.InitContainers); i++ { + var found bool + for _, mount := range temp.Spec.InitContainers[i].VolumeMounts { + if mount.MountPath == TokenVolumeMountPath { + found = true + break + } + } + // remove useless initContainer + if found { + temp.Spec.InitContainers = append(temp.Spec.InitContainers[:i], temp.Spec.InitContainers[i+1:]...) + i-- + } + } + continue + } + newContainer := v1.Container{ + Name: fmt.Sprintf("download-" + volume.Name), + Image: config.Image, + Command: []string{"sh", "-c"}, + Args: []string{strings.Join(args, "&&")}, + WorkingDir: "/tmp", + Env: []v1.EnvVar{ + { + Name: clientcmd.RecommendedConfigPathEnvVar, + Value: "/tmp/.kube/kubeconfig", + }, + }, + Resources: v1.ResourceRequirements{}, + VolumeMounts: []v1.VolumeMount{ + { + Name: volume.Name, + MountPath: mountPoint, + }, + { + Name: config.KUBECONFIG, + ReadOnly: false, + MountPath: "/tmp/.kube", + }, + }, + ImagePullPolicy: v1.PullIfNotPresent, + } + initContainer = append(initContainer, newContainer) + } + // put download volume to front + temp.Spec.InitContainers = append(initContainer, temp.Spec.InitContainers...) + // replace old one + temp.Spec.Volumes = volumeList + // remove containers vpn and envoy-proxy + mesh.RemoveContainers(temp) + // add each container service account token + if tokenVolume != "" { + for i := 0; i < len(temp.Spec.Containers); i++ { + var found bool + for _, mount := range temp.Spec.Containers[i].VolumeMounts { + if mount.MountPath == TokenVolumeMountPath { + found = true + break + } + } + if !found { + temp.Spec.Containers[i].VolumeMounts = append(temp.Spec.Containers[i].VolumeMounts, v1.VolumeMount{ + Name: tokenVolume, + MountPath: TokenVolumeMountPath, + }) + } + } + } + var marshal []byte + if marshal, err = json.Marshal(temp.Spec); err != nil { + return err + } + var content map[string]interface{} + err = json.Unmarshal(marshal, &content) + if err != nil { + return err + } + if err = unstructured.SetNestedField(u.Object, content, append(path, "spec")...); err != nil { + return err + } + return nil +} + +// todo set env +func (d DuplicateOptions) setEnv(u *unstructured.Unstructured) error { + temp, _, err := util.GetPodTemplateSpecPath(u) + if err != nil { + return err + } + + sortBy := func(pods []*v1.Pod) sort.Interface { + for i := 0; i < len(pods); i++ { + if pods[i].DeletionTimestamp != nil { + pods = append(pods[:i], pods[i+1:]...) + i-- + } + } + return sort.Reverse(podutils.ActivePods(pods)) + } + lab := labels.SelectorFromSet(temp.Labels).String() + pod, _, err := polymorphichelpers.GetFirstPod(d.clientset.CoreV1(), d.Namespace, lab, time.Second*60, sortBy) + if err != nil { + return err + } + + var envMap map[string][]string + envMap, err = dev.GetEnv(context.Background(), d.factory, d.Namespace, pod.Name) + if err != nil { + return err + } + println(envMap) + + var secretMap = make(map[string]*v1.Secret) + var configmapMap = make(map[string]*v1.ConfigMap) + + var howToGetCm = func(name string) { + if configmapMap[name] == nil { + cm, err := d.clientset.CoreV1().ConfigMaps(d.Namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err == nil { + configmapMap[name] = cm + } + } + } + var howToGetSecret = func(name string) { + if configmapMap[name] == nil { + secret, err := d.clientset.CoreV1().Secrets(d.Namespace).Get(context.Background(), name, metav1.GetOptions{}) + if err == nil { + secretMap[name] = secret + } + } + } + + for _, container := range temp.Spec.Containers { + for _, envVar := range container.Env { + if envVar.ValueFrom != nil { + if ref := envVar.ValueFrom.ConfigMapKeyRef; ref != nil { + howToGetCm(ref.Name) + } + if ref := envVar.ValueFrom.SecretKeyRef; ref != nil { + howToGetSecret(ref.Name) + } + } + } + for _, source := range container.EnvFrom { + if ref := source.ConfigMapRef; ref != nil { + if configmapMap[ref.Name] == nil { + howToGetCm(ref.Name) + } + } + if ref := source.SecretRef; ref != nil { + howToGetSecret(ref.Name) + } + } + } + + for i := 0; i < len(temp.Spec.Containers); i++ { + container := temp.Spec.Containers[i] + for i, envFromSource := range container.EnvFrom { + if ref := envFromSource.ConfigMapRef; ref != nil && configmapMap[ref.Name] != nil { + if envFromSource.Prefix != "" { + println(i) + } + } + if envFromSource.SecretRef != nil { + + } + } + + for _, envVar := range container.Env { + println(envVar) + } + } + return nil +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 4d5238df..035b3e8e 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -657,3 +657,47 @@ func CleanExtensionLib() { } } } + +func WaitPodToBeReady(ctx context.Context, podInterface v12.PodInterface, selector metav1.LabelSelector) error { + watchStream, err := podInterface.Watch(ctx, metav1.ListOptions{ + LabelSelector: fields.SelectorFromSet(selector.MatchLabels).String(), + }) + if err != nil { + return err + } + defer watchStream.Stop() + var last string + for { + select { + case e, ok := <-watchStream.ResultChan(): + if !ok { + return fmt.Errorf("can not wait pod to be ready because of watch chan has closed") + } + if podT, ok := e.Object.(*v1.Pod); ok { + if podT.DeletionTimestamp != nil { + continue + } + var sb = bytes.NewBuffer(nil) + sb.WriteString(fmt.Sprintf("pod [%s] status is %s\n", podT.Name, podT.Status.Phase)) + PrintStatus(podT, sb) + + if last != sb.String() { + log.Infof(sb.String()) + } + if podutils.IsPodReady(podT) && func() bool { + for _, status := range podT.Status.ContainerStatuses { + if !status.Ready { + return false + } + } + return true + }() { + return nil + } + last = sb.String() + } + case <-time.Tick(time.Minute * 60): + return errors.New(fmt.Sprintf("wait pod to be ready timeout")) + } + } +}