Skip to content
Commits on Source (2)
......@@ -3,22 +3,28 @@ package core
import (
"context"
"fmt"
"sort"
"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"
appv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
corev1alpha1 "libre.sh/controller/apis/core/v1alpha1"
"libre.sh/controller/internal"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// RedisReconciler reconciles a Redis object
......@@ -89,12 +95,12 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resu
{
APIGroups: []string{""},
Resources: []string{"pods"},
Verbs: []string{"get", "list", "watch", "patch"},
Verbs: []string{"get", "list", "watch"},
},
{
APIGroups: []string{"coordination.k8s.io"},
Resources: []string{"leases"},
Verbs: []string{"*"},
APIGroups: []string{""},
Resources: []string{"endpoints"},
Verbs: []string{"get"},
},
}
return nil
......@@ -129,9 +135,8 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resu
service.SetNamespace(redis.Namespace)
_, err = ctrl.CreateOrUpdate(ctx, r.Client, &service, func() error {
service.Labels = labels
service.Spec = v1.ServiceSpec{
Selector: labels,
Ports: []v1.ServicePort{
service.Spec = corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{
Port: 6379,
TargetPort: intstr.FromInt(6379),
......@@ -144,46 +149,89 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resu
return
}
endpoints := corev1.Endpoints{}
endpoints.Name = fmt.Sprintf("%s-keydb", redis.Name)
endpoints.SetNamespace(redis.Namespace)
_, err = ctrl.CreateOrUpdate(ctx, r.Client, &endpoints, func() error {
ip := getEndpointsIP(&endpoints)
mainPod, err := r.getMainPod(ctx, &redis, ip)
if err != nil {
return err
}
endpoints.Labels = labels
endpoints.Subsets = []corev1.EndpointSubset{}
if mainPod != nil {
endpoints.Subsets = []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{
{
IP: mainPod.Status.PodIP,
Hostname: mainPod.Name,
TargetRef: &corev1.ObjectReference{
Kind: mainPod.Kind,
Namespace: mainPod.Namespace,
Name: mainPod.Name,
UID: mainPod.UID,
},
},
},
Ports: []corev1.EndpointPort{
{
Port: 6379,
Protocol: corev1.ProtocolTCP,
},
},
},
}
}
return nil
})
if err != nil {
return
}
sts := appv1.StatefulSet{}
sts.Name = fmt.Sprintf("keydb-%s", redis.Name)
sts.Namespace = redis.Namespace
_, err = ctrl.CreateOrUpdate(ctx, r.Client, &sts, func() error {
sts.Labels = labels
replicas := int32(1)
replicas := int32(2)
sts.Spec.Replicas = &replicas
sts.Spec.ServiceName = service.Name
sts.Spec.Selector = &metav1.LabelSelector{
MatchLabels: labels,
}
probe := v1.Probe{
ProbeHandler: v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Port: intstr.FromInt(6379),
probe := corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/healthz",
Port: intstr.FromInt(80),
},
},
}
sts.Spec.Template = v1.PodTemplateSpec{
sts.Spec.Template = corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: v1.PodSpec{
Spec: corev1.PodSpec{
ServiceAccountName: serviceAccount.GetName(),
Containers: []v1.Container{
Containers: []corev1.Container{
{
Name: "keydb",
Image: "eqalpha/keydb:alpine_x86_64_v6.3.1",
Args: []string{"--active-replica", "yes", "--multi-master", "yes"},
ReadinessProbe: &probe,
LivenessProbe: &probe,
StartupProbe: &corev1.Probe{
PeriodSeconds: 1,
FailureThreshold: 30,
ProbeHandler: probe.ProbeHandler,
Name: "keydb",
Image: "eqalpha/keydb:alpine_x86_64_v6.3.1",
Args: []string{"--active-replica", "yes", "--multi-master", "yes"},
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/graceful-shutdown/keydb",
Port: intstr.FromInt(80),
},
},
},
},
{
Name: "manager",
Image: "libresh/keydb-manager:v0.0.1",
Image: "libresh/keydb-manager:v0.0.2",
Env: []corev1.EnvVar{
{
Name: "NAMESPACE",
......@@ -198,6 +246,21 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resu
Value: redis.Name,
},
},
Lifecycle: &corev1.Lifecycle{
PreStop: &corev1.LifecycleHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/graceful-shutdown/manager",
Port: intstr.FromInt(80),
},
},
},
ReadinessProbe: &probe,
LivenessProbe: &probe,
StartupProbe: &corev1.Probe{
PeriodSeconds: 1,
FailureThreshold: 30,
ProbeHandler: probe.ProbeHandler,
},
},
},
},
......@@ -232,5 +295,72 @@ func (r *RedisReconciler) Reconcile(ctx context.Context, req ctrl.Request) (resu
func (r *RedisReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1alpha1.Redis{}).
Watches(
&source.Kind{Type: &corev1.Pod{}},
handler.EnqueueRequestsFromMapFunc(r.findObjectsForPod),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
Complete(r)
}
func (r *RedisReconciler) findObjectsForPod(pod client.Object) []reconcile.Request {
requests := []reconcile.Request{}
labels := pod.GetLabels()
if labels["app.kubernetes.io/managed-by"] == "libre.sh" &&
labels["app.kubernetes.io/name"] == "keydb" &&
labels["app.kubernetes.io/instance"] != "" {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: labels["app.kubernetes.io/instance"],
Namespace: pod.GetNamespace(),
},
})
}
return requests
}
func (r *RedisReconciler) getMainPod(ctx context.Context, redis *corev1alpha1.Redis, ip string) (*corev1.Pod, error) {
pods := corev1.PodList{}
err := r.List(ctx, &pods, client.InNamespace(redis.Namespace), client.MatchingLabels{
"app.kubernetes.io/name": "keydb",
"app.kubernetes.io/instance": redis.Name,
"app.kubernetes.io/managed-by": "libre.sh",
})
if err != nil {
return nil, err
}
readyPods := []corev1.Pod{}
for _, pod := range pods.Items {
if !pod.DeletionTimestamp.IsZero() {
continue
}
for _, condition := range pod.Status.Conditions {
if condition.Type == "Ready" && condition.Status == "True" {
readyPods = append(readyPods, pod)
}
}
}
if ip != "" {
for _, pod := range readyPods {
if pod.Status.PodIP == ip {
return &pod, nil
}
}
}
sort.Slice(readyPods, func(i, j int) bool {
return readyPods[i].Name < readyPods[j].Name
})
if len(readyPods) > 0 {
return &readyPods[0], nil
}
return nil, nil
}
func getEndpointsIP(endpoints *corev1.Endpoints) string {
for _, subset := range endpoints.Subsets {
for _, addresse := range subset.Addresses {
return addresse.IP
}
}
return ""
}
......@@ -10,7 +10,6 @@ import (
"github.com/fluxcd/pkg/runtime/conditions"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
......@@ -199,23 +198,19 @@ func (r *SAMLClientReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *SAMLClientReconciler) findObjectsForSecret(secret client.Object) []reconcile.Request {
list := &corev1alpha1.SAMLClientList{}
listOps := &client.ListOptions{
Namespace: secret.GetNamespace(),
FieldSelector: fields.OneTermEqualSelector(".metadata.name", secret.GetName()),
}
err := r.List(context.TODO(), list, listOps)
err := r.List(context.TODO(), list, client.InNamespace(secret.GetNamespace()))
if err != nil {
return []reconcile.Request{}
}
requests := make([]reconcile.Request, len(list.Items))
for i, item := range list.Items {
requests[i] = reconcile.Request{
NamespacedName: types.NamespacedName{
Name: item.GetName(),
Namespace: item.GetNamespace(),
},
requests := []reconcile.Request{}
for _, item := range list.Items {
if secret.GetName() == fmt.Sprintf("%s.user.realm.libre.sh", secret.GetNamespace()) || secret.GetName() == item.GetName() {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Name: item.GetName(),
Namespace: item.GetNamespace(),
},
})
}
}
return requests
......