Skip to content
163 changes: 150 additions & 13 deletions controllers/openstackfloatingippool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
clusterv1beta1 "sigs.k8s.io/cluster-api/api/core/v1beta1"
clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
ipamv1 "sigs.k8s.io/cluster-api/api/ipam/v1beta2"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
v1beta1conditions "sigs.k8s.io/cluster-api/util/deprecated/v1beta1/conditions"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -72,6 +77,7 @@ type OpenStackFloatingIPPoolReconciler struct {
// +kubebuilder:rbac:groups=infrastructure.cluster.x-k8s.io,resources=openstackfloatingippools/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=ipam.cluster.x-k8s.io,resources=ipaddressclaims;ipaddressclaims/status,verbs=get;list;watch;update;create;delete
// +kubebuilder:rbac:groups=ipam.cluster.x-k8s.io,resources=ipaddresses;ipaddresses/status,verbs=get;list;watch;create;update;delete
// +kubebuilder:rbac:groups=cluster.x-k8s.io,resources=clusters;clusters/status,verbs=get;list;watch

func (r *OpenStackFloatingIPPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx)
Expand Down Expand Up @@ -127,7 +133,45 @@ func (r *OpenStackFloatingIPPoolReconciler) Reconcile(ctx context.Context, req c

for _, claim := range claims.Items {
log := log.WithValues("claim", claim.Name)

cluster, err := util.GetClusterFromMetadata(ctx, r.Client, claim.ObjectMeta)
if err != nil {
log.Error(err, "Failed to get owning cluster, skipping claim", "claim", claim.Name)
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a scenario where there's no cluster label no the claim. In that case, we would get ErrNoCluster and since you have a continue here, it would just skip the claim completely.

This can cause a problem since any IPAddressClaim without a cluster label will never be processed and never be deleted (since finalizer won't be removed).

I think we should process it normally when label is not found as well as this will avoid the regression. Currently, this is what we do.

}

if annotations.IsPaused(cluster, &claim) {
log.V(4).Info("IPAddressClaim or linked Cluster is paused, skipping reconcile", "claim", claim.Name, "namespace", claim.Namespace)
continue
}

// Add finalizer if it does not exist
if controllerutil.AddFinalizer(&claim, infrav1alpha1.OpenStackFloatingIPPoolFinalizer) {
if err := r.Client.Update(ctx, &claim); err != nil {
log.Error(err, "Failed to add finalizer to claim", "claim", claim.Name)
return ctrl.Result{}, err
}
continue
}

if !claim.DeletionTimestamp.IsZero() {
ipaddressName := claim.Status.AddressRef.Name
if ipaddressName != "" {
ipAddress := &ipamv1.IPAddress{}
err := r.Client.Get(ctx, client.ObjectKey{Name: claim.Name, Namespace: claim.Namespace}, ipAddress)
if err != nil {
return ctrl.Result{}, err
}
err = r.deleteIPAddress(ctx, scope, pool, ipAddress)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to delete IPAddress %q: %w", ipAddress.Name, err)
}
}

controllerutil.RemoveFinalizer(&claim, infrav1alpha1.OpenStackFloatingIPPoolFinalizer)
if err := r.Client.Update(ctx, &claim); err != nil && reterr == nil {
reterr = err
}
Comment thread
archerwu9425 marked this conversation as resolved.
continue
}

Expand Down Expand Up @@ -272,10 +316,6 @@ func (r *OpenStackFloatingIPPoolReconciler) reconcileIPAddresses(ctx context.Con
return err
}

networkingService, err := networking.NewService(scope)
if err != nil {
return err
}
pool.Status.ClaimedIPs = []string{}
if pool.Status.AvailableIPs == nil {
pool.Status.AvailableIPs = []string{}
Expand All @@ -288,18 +328,40 @@ func (r *OpenStackFloatingIPPoolReconciler) reconcileIPAddresses(ctx context.Con
continue
}

if controllerutil.ContainsFinalizer(ipAddress, infrav1alpha1.DeleteFloatingIPFinalizer) {
if pool.Spec.ReclaimPolicy == infrav1alpha1.ReclaimDelete && !contains(pool.Spec.PreAllocatedFloatingIPs, ipAddress.Spec.Address) {
if err = networkingService.DeleteFloatingIP(pool, ipAddress.Spec.Address); err != nil {
return fmt.Errorf("delete floating IP %q: %w", ipAddress.Spec.Address, err)
// Check if the owning claim or its cluster is paused before processing deletion,
// and clear the claim's AddressRef so it can be re-reconciled once unpaused or re-created.
claim := &ipamv1.IPAddressClaim{}
if ipAddress.Spec.ClaimRef.Name == "" {
claim = nil
} else {
if err := r.Client.Get(ctx, client.ObjectKey{Name: ipAddress.Spec.ClaimRef.Name, Namespace: ipAddress.Namespace}, claim); err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get IPAddressClaim %q: %w", ipAddress.Spec.ClaimRef.Name, err)
}
claim = nil
} else {
pool.Status.AvailableIPs = append(pool.Status.AvailableIPs, ipAddress.Spec.Address)
cluster, err := util.GetClusterFromMetadata(ctx, r.Client, claim.ObjectMeta)
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get owning cluster for claim %q: %w", claim.Name, err)
}
if cluster != nil && annotations.IsPaused(cluster, claim) {
scope.Logger().V(4).Info("IPAddress owner IPAddressClaim or linked Cluster is paused, skipping deletion", "ipAddress", ipAddress.Name, "claim", claim.Name)
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the deletion will be skipped. So, the floating IP exists in OpenStack. And the following code at line 367 will mark this as available. This might cause the IP to be allocated to two different claims.

Fix: add this to ClaimedIPs. That way, this IP won't be utilized by any other resource.

}
}
}
controllerutil.RemoveFinalizer(ipAddress, infrav1alpha1.DeleteFloatingIPFinalizer)
if err := r.Client.Update(ctx, ipAddress); err != nil {
return err

err := r.deleteIPAddress(ctx, scope, pool, ipAddress)
if err != nil {
return fmt.Errorf("failed to delete IPAddress %q: %w", ipAddress.Name, err)
}

// Clear AddressRef so the claim will be re-assigned an IP on the next reconcile.
if claim != nil && claim.Status.AddressRef.Name != "" {
claim.Status.AddressRef.Name = ""
if err := r.Client.Status().Update(ctx, claim); err != nil {
return fmt.Errorf("failed to clear AddressRef for claim %q: %w", claim.Name, err)
Comment thread
archerwu9425 marked this conversation as resolved.
}
}
}
allIPs := union(pool.Status.AvailableIPs, pool.Spec.PreAllocatedFloatingIPs)
Expand All @@ -308,6 +370,30 @@ func (r *OpenStackFloatingIPPoolReconciler) reconcileIPAddresses(ctx context.Con
return nil
}

func (r *OpenStackFloatingIPPoolReconciler) deleteIPAddress(ctx context.Context, scope *scope.WithLogger, pool *infrav1alpha1.OpenStackFloatingIPPool, ipAddress *ipamv1.IPAddress) error {
networkingService, err := networking.NewService(scope)
if err != nil {
return err
}

if controllerutil.ContainsFinalizer(ipAddress, infrav1alpha1.DeleteFloatingIPFinalizer) {
if pool.Spec.ReclaimPolicy == infrav1alpha1.ReclaimDelete && !contains(pool.Spec.PreAllocatedFloatingIPs, ipAddress.Spec.Address) {
if err = networkingService.DeleteFloatingIP(pool, ipAddress.Spec.Address); err != nil {
return fmt.Errorf("delete floating IP %q: %w", ipAddress.Spec.Address, err)
}
} else {
pool.Status.AvailableIPs = append(pool.Status.AvailableIPs, ipAddress.Spec.Address)
}
}

controllerutil.RemoveFinalizer(ipAddress, infrav1alpha1.DeleteFloatingIPFinalizer)
if err := r.Client.Update(ctx, ipAddress); err != nil {
return err
}

return nil
}

func (r *OpenStackFloatingIPPoolReconciler) getIP(ctx context.Context, scope *scope.WithLogger, pool *infrav1alpha1.OpenStackFloatingIPPool) (string, error) {
// There's a potential leak of IPs here, if the reconcile loop fails after we claim an IP but before we create the IPAddress object.
var ip string
Expand Down Expand Up @@ -422,14 +508,27 @@ func (r *OpenStackFloatingIPPoolReconciler) reconcileFloatingIPNetwork(scope *sc
return nil
}

func (r *OpenStackFloatingIPPoolReconciler) ipAddressClaimToPoolMapper(_ context.Context, o client.Object) []ctrl.Request {
func (r *OpenStackFloatingIPPoolReconciler) ipAddressClaimToPoolMapper(ctx context.Context, o client.Object) []ctrl.Request {
log := ctrl.LoggerFrom(ctx)
claim, ok := o.(*ipamv1.IPAddressClaim)
if !ok {
panic(fmt.Sprintf("Expected a IPAddressClaim but got a %T", o))
}
if claim.Spec.PoolRef.Kind != openStackFloatingIPPool {
return nil
}

cluster, err := util.GetClusterFromMetadata(ctx, r.Client, claim.ObjectMeta)
if err != nil {
log.Error(err, "Failed to get owning cluster, skipping mapping", "claim", claim.Name, "namespace", claim.Namespace)
return nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue as above. If the claim doesn't have a cluster label, GetClusterFromMetadata returns ErrNoCluster, and this returns nil. Meaning, changes to this claim will never trigger a pool reconciliation. The controller won't even notice the claim was created, updated, or deleted.

fix: If there's no cluster, we should still map the event to the pool so reconciliation proceeds normally.

}

if annotations.IsPaused(cluster, claim) {
log.V(4).Info("IPAddressClaim or linked Cluster is paused, skipping mapping", "claim", claim.Name, "namespace", claim.Namespace)
return nil
}

return []ctrl.Request{
{
NamespacedName: client.ObjectKey{
Expand All @@ -440,6 +539,39 @@ func (r *OpenStackFloatingIPPoolReconciler) ipAddressClaimToPoolMapper(_ context
}
}

func (r *OpenStackFloatingIPPoolReconciler) clusterToPoolMapper(ctx context.Context, o client.Object) []ctrl.Request {
log := ctrl.LoggerFrom(ctx)
cluster, ok := o.(*clusterv1.Cluster)
if !ok {
panic(fmt.Sprintf("Expected a Cluster but got a %T", o))
}

claims := &ipamv1.IPAddressClaimList{}
if err := r.Client.List(ctx, claims, client.InNamespace(cluster.Namespace), client.MatchingLabels{clusterv1.ClusterNameLabel: cluster.Name}); err != nil {
log.Error(err, "Failed to list IPAddressClaims for cluster, skipping mapping", "cluster", cluster.Name, "namespace", cluster.Namespace)
return nil
}

requestsByPool := make(map[client.ObjectKey]struct{})
for i := range claims.Items {
claim := &claims.Items[i]
if claim.Spec.PoolRef.Kind != openStackFloatingIPPool {
continue
}
if annotations.IsPaused(cluster, claim) {
continue
}
requestsByPool[client.ObjectKey{Name: claim.Spec.PoolRef.Name, Namespace: claim.Namespace}] = struct{}{}
}

requests := make([]ctrl.Request, 0, len(requestsByPool))
for key := range requestsByPool {
requests = append(requests, ctrl.Request{NamespacedName: key})
}

return requests
}

func (r *OpenStackFloatingIPPoolReconciler) ipAddressToPoolMapper(_ context.Context, o client.Object) []ctrl.Request {
ip, ok := o.(*ipamv1.IPAddress)
if !ok {
Expand Down Expand Up @@ -485,6 +617,11 @@ func (r *OpenStackFloatingIPPoolReconciler) SetupWithManager(ctx context.Context
&ipamv1.IPAddressClaim{},
handler.EnqueueRequestsFromMapFunc(r.ipAddressClaimToPoolMapper),
).
Watches(
&clusterv1.Cluster{},
handler.EnqueueRequestsFromMapFunc(r.clusterToPoolMapper),
builder.WithPredicates(predicates.ClusterUnpaused(mgr.GetScheme(), ctrl.LoggerFrom(ctx))),
).
Watches(
&ipamv1.IPAddress{},
handler.EnqueueRequestsFromMapFunc(r.ipAddressToPoolMapper),
Expand Down
Loading