Skip to content

Commit

Permalink
Refactor zookeeper controller (#344)
Browse files Browse the repository at this point in the history
Signed-off-by: Wenjie Ma <[email protected]>
  • Loading branch information
euclidgame authored Oct 15, 2023
1 parent 7510b88 commit 77b3530
Show file tree
Hide file tree
Showing 88 changed files with 3,643 additions and 2,821 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::kubernetes_api_objects::prelude::*;
use crate::kubernetes_api_objects::resource::ResourceWrapper;
use crate::reconciler::exec::{io::*, reconciler::*, resource_builder::*};
use crate::reconciler::spec::resource_builder::ResourceBuilder as SpecResourceBuilder;
use crate::vstd_ext::{string_map::StringMap, string_view::*, to_view::*};
use crate::vstd_ext::{string_map::StringMap, string_view::*};
use vstd::{prelude::*, string::*};

verus! {
Expand Down Expand Up @@ -202,15 +202,11 @@ pub fn reconcile_helper<
if resp_o.is_some() && resp_o.as_ref().unwrap().is_k_response()
&& resp_o.as_ref().unwrap().as_k_response_ref().is_create_response()
&& resp_o.as_ref().unwrap().as_k_response_ref().as_create_response_ref().res.is_ok() {
let state_prime = Builder::state_after_create_or_update(resp_o.unwrap().into_k_response().into_create_response().res.unwrap(), state.clone());
let (next_step, req_opt) = next_resource_get_step_and_request(fb, resource);
if state_prime.is_ok() {
let state_prime_with_next_step = FluentBitReconcileState {
reconcile_step: next_step,
..state_prime.unwrap()
};
let req = if req_opt.is_some() { Some(Request::KRequest(KubeAPIRequest::GetRequest(req_opt.unwrap()))) } else { None };
return (state_prime_with_next_step, req);
let next_state = Builder::state_after_create(fb, resp_o.unwrap().into_k_response().into_create_response().res.unwrap(), state.clone());
if next_state.is_ok() {
let (state_prime, req) = next_state.unwrap();
let req_o = if req.is_some() { Some(Request::KRequest(req.unwrap())) } else { None };
return (state_prime, req_o);
}
}
let state_prime = FluentBitReconcileState {
Expand All @@ -224,15 +220,11 @@ pub fn reconcile_helper<
if resp_o.is_some() && resp_o.as_ref().unwrap().is_k_response()
&& resp_o.as_ref().unwrap().as_k_response_ref().is_update_response()
&& resp_o.as_ref().unwrap().as_k_response_ref().as_update_response_ref().res.is_ok() {
let state_prime = Builder::state_after_create_or_update(resp_o.unwrap().into_k_response().into_update_response().res.unwrap(), state.clone());
let (next_step, req_opt) = next_resource_get_step_and_request(fb, resource);
if state_prime.is_ok() {
let state_prime_with_next_step = FluentBitReconcileState {
reconcile_step: next_step,
..state_prime.unwrap()
};
let req = if req_opt.is_some() { Some(Request::KRequest(KubeAPIRequest::GetRequest(req_opt.unwrap()))) } else { None };
return (state_prime_with_next_step, req);
let next_state = Builder::state_after_update(fb, resp_o.unwrap().into_k_response().into_update_response().res.unwrap(), state.clone());
if next_state.is_ok() {
let (state_prime, req) = next_state.unwrap();
let req_o = if req.is_some() { Some(Request::KRequest(req.unwrap())) } else { None };
return (state_prime, req_o);
}
}
let state_prime = FluentBitReconcileState {
Expand All @@ -254,27 +246,4 @@ pub fn reconcile_helper<
}
}

fn next_resource_get_step_and_request(fb: &FluentBit, sub_resource: SubResource) -> (res: (FluentBitReconcileStep, Option<KubeGetRequest>))
requires
fb@.well_formed(),
ensures
res.1.is_Some() == fb_spec::next_resource_get_step_and_request(fb@, sub_resource).1.is_Some(),
res.1.is_Some() ==> res.1.get_Some_0().to_view() == fb_spec::next_resource_get_step_and_request(fb@, sub_resource).1.get_Some_0(),
res.0 == fb_spec::next_resource_get_step_and_request(fb@, sub_resource).0,
{
match sub_resource {
SubResource::ServiceAccount => (after_get_k_request_step(SubResource::Role), Some(RoleBuilder::get_request(fb))),
SubResource::Role => (after_get_k_request_step(SubResource::RoleBinding), Some(RoleBindingBuilder::get_request(fb))),
SubResource::RoleBinding => (after_get_k_request_step(SubResource::DaemonSet), Some(DaemonSetBuilder::get_request(fb))),
_ => (FluentBitReconcileStep::Done, None),
}
}

fn after_get_k_request_step(sub_resource: SubResource) -> (step: FluentBitReconcileStep)
ensures
step == fb_spec::after_get_k_request_step(sub_resource),
{
FluentBitReconcileStep::AfterKRequestStep(ActionKind::Get, sub_resource)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::kubernetes_api_objects::{
volume::*,
};
use crate::reconciler::exec::{io::*, reconciler::*};
use crate::vstd_ext::{string_map::StringMap, string_view::*, to_view::*};
use crate::vstd_ext::{string_map::StringMap, string_view::*};
use vstd::prelude::*;
use vstd::seq_lib::*;
use vstd::string::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,27 @@ impl ResourceBuilder<FluentBit, FluentBitReconcileState, spec_resource::DaemonSe
return Err(());
}

fn state_after_create_or_update(obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<FluentBitReconcileState, ()>) {
fn state_after_create(fb: &FluentBit, obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<(FluentBitReconcileState, Option<KubeAPIRequest>), ()>) {
let ds = DaemonSet::unmarshal(obj);
if ds.is_ok() {
Ok(state)
let state_prime = FluentBitReconcileState {
reconcile_step: FluentBitReconcileStep::Done,
..state
};
Ok((state_prime, None))
} else {
Err(())
}
}

fn state_after_update(fb: &FluentBit, obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<(FluentBitReconcileState, Option<KubeAPIRequest>), ()>) {
let ds = DaemonSet::unmarshal(obj);
if ds.is_ok() {
let state_prime = FluentBitReconcileState {
reconcile_step: FluentBitReconcileStep::Done,
..state
};
Ok((state_prime, None))
} else {
Err(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,29 @@ impl ResourceBuilder<FluentBit, FluentBitReconcileState, spec_resource::RoleBuil
}
}

fn state_after_create_or_update(obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<FluentBitReconcileState, ()>) {
fn state_after_create(fb: &FluentBit, obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<(FluentBitReconcileState, Option<KubeAPIRequest>), ()>) {
let role = Role::unmarshal(obj);
if role.is_ok() {
Ok(state)
let state_prime = FluentBitReconcileState {
reconcile_step: FluentBitReconcileStep::AfterKRequestStep(ActionKind::Get, SubResource::RoleBinding),
..state
};
let req = KubeAPIRequest::GetRequest(RoleBindingBuilder::get_request(fb));
Ok((state_prime, Some(req)))
} else {
Err(())
}
}

fn state_after_update(fb: &FluentBit, obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<(FluentBitReconcileState, Option<KubeAPIRequest>), ()>) {
let role = Role::unmarshal(obj);
if role.is_ok() {
let state_prime = FluentBitReconcileState {
reconcile_step: FluentBitReconcileStep::AfterKRequestStep(ActionKind::Get, SubResource::RoleBinding),
..state
};
let req = KubeAPIRequest::GetRequest(RoleBindingBuilder::get_request(fb));
Ok((state_prime, Some(req)))
} else {
Err(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use super::common::*;
use crate::external_api::exec::*;
use crate::fluent_controller::fluentbit::common::*;
use crate::fluent_controller::fluentbit::exec::resource::daemon_set::DaemonSetBuilder;
use crate::fluent_controller::fluentbit::exec::resource::role::make_role_name;
use crate::fluent_controller::fluentbit::exec::resource::service_account::make_service_account_name;
use crate::fluent_controller::fluentbit::exec::types::*;
Expand Down Expand Up @@ -51,10 +52,29 @@ impl ResourceBuilder<FluentBit, FluentBitReconcileState, spec_resource::RoleBind
}
}

fn state_after_create_or_update(obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<FluentBitReconcileState, ()>) {
fn state_after_create(fb: &FluentBit, obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<(FluentBitReconcileState, Option<KubeAPIRequest>), ()>) {
let rb = RoleBinding::unmarshal(obj);
if rb.is_ok() {
Ok(state)
let state_prime = FluentBitReconcileState {
reconcile_step: FluentBitReconcileStep::AfterKRequestStep(ActionKind::Get, SubResource::DaemonSet),
..state
};
let req = KubeAPIRequest::GetRequest(DaemonSetBuilder::get_request(fb));
Ok((state_prime, Some(req)))
} else {
Err(())
}
}

fn state_after_update(fb: &FluentBit, obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<(FluentBitReconcileState, Option<KubeAPIRequest>), ()>) {
let rb = RoleBinding::unmarshal(obj);
if rb.is_ok() {
let state_prime = FluentBitReconcileState {
reconcile_step: FluentBitReconcileStep::AfterKRequestStep(ActionKind::Get, SubResource::DaemonSet),
..state
};
let req = KubeAPIRequest::GetRequest(DaemonSetBuilder::get_request(fb));
Ok((state_prime, Some(req)))
} else {
Err(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,29 @@ impl ResourceBuilder<FluentBit, FluentBitReconcileState, spec_resource::ServiceA
}
}

fn state_after_create_or_update(obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<FluentBitReconcileState, ()>) {
fn state_after_create(fb: &FluentBit, obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<(FluentBitReconcileState, Option<KubeAPIRequest>), ()>) {
let sa = ServiceAccount::unmarshal(obj);
if sa.is_ok() {
Ok(state)
let state_prime = FluentBitReconcileState {
reconcile_step: FluentBitReconcileStep::AfterKRequestStep(ActionKind::Get, SubResource::Role),
..state
};
let req = KubeAPIRequest::GetRequest(RoleBuilder::get_request(fb));
Ok((state_prime, Some(req)))
} else {
Err(())
}
}

fn state_after_update(fb: &FluentBit, obj: DynamicObject, state: FluentBitReconcileState) -> (res: Result<(FluentBitReconcileState, Option<KubeAPIRequest>), ()>) {
let sa = ServiceAccount::unmarshal(obj);
if sa.is_ok() {
let state_prime = FluentBitReconcileState {
reconcile_step: FluentBitReconcileStep::AfterKRequestStep(ActionKind::Get, SubResource::Role),
..state
};
let req = KubeAPIRequest::GetRequest(RoleBuilder::get_request(fb));
Ok((state_prime, Some(req)))
} else {
Err(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ proof fn lemma_true_leads_to_state_matches_for_all_resources(fb: FluentBitView)
// at_after_get_resource_step(sub_resource) from an arbitrary state.
assert forall |sub_resource: SubResource| sub_resource != SubResource::DaemonSet implies spec.entails(
lift_state(#[trigger] pending_req_in_flight_at_after_get_resource_step(sub_resource, fb))
.leads_to(lift_state(pending_req_in_flight_at_after_get_resource_step(next_resource_get_step_and_request(fb, sub_resource).0.get_AfterKRequestStep_1(), fb)))) by {
.leads_to(lift_state(pending_req_in_flight_at_after_get_resource_step(next_resource_after(sub_resource).get_AfterKRequestStep_1(), fb)))) by {
use_tla_forall_for_sub_resource(spec, sub_resource, fb);
let next_resource = next_resource_get_step_and_request(fb, sub_resource).0.get_AfterKRequestStep_1();
let next_resource = next_resource_after(sub_resource).get_AfterKRequestStep_1();
lemma_from_after_get_resource_step_to_resource_matches(spec, fb, sub_resource, next_resource);
}
// Thanks to the recursive construction of macro.
Expand All @@ -117,7 +117,7 @@ proof fn lemma_true_leads_to_state_matches_for_all_resources(fb: FluentBitView)
assert forall |sub_resource: SubResource| spec.entails(true_pred().leads_to(lift_state(#[trigger] sub_resource_state_matches(sub_resource, fb)))) by {
use_tla_forall_for_sub_resource(spec, sub_resource, fb);
let next_resource = if sub_resource == SubResource::DaemonSet { sub_resource } else {
next_resource_get_step_and_request(fb, sub_resource).0.get_AfterKRequestStep_1()
next_resource_after(sub_resource).get_AfterKRequestStep_1()
};
lemma_from_after_get_resource_step_to_resource_matches(spec, fb, sub_resource, next_resource);
leads_to_trans_temp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub proof fn lemma_from_after_get_resource_step_to_resource_matches(
lift_state(pending_req_in_flight_at_after_get_resource_step(sub_resource, fb))
.leads_to(lift_state(sub_resource_state_matches(sub_resource, fb)))
),
next_resource_get_step_and_request(fb, sub_resource).0 == after_get_k_request_step(next_resource) ==> spec.entails(
next_resource_after(sub_resource) == after_get_k_request_step(next_resource) ==> spec.entails(
lift_state(pending_req_in_flight_at_after_get_resource_step(sub_resource, fb))
.leads_to(lift_state(pending_req_in_flight_at_after_get_resource_step(next_resource, fb)))
),
Expand All @@ -84,7 +84,7 @@ pub proof fn lemma_from_after_get_resource_step_to_resource_matches(
temp_pred_equality(
lift_state(key_not_exists).or(lift_state(key_exists)), lift_state(pending_req_in_flight_at_after_get_resource_step(sub_resource, fb))
);
if next_resource_get_step_and_request(fb, sub_resource).0 == after_get_k_request_step(next_resource) {
if next_resource_after(sub_resource) == after_get_k_request_step(next_resource) {
or_leads_to_combine(spec, key_not_exists, key_exists, pending_req_in_flight_at_after_get_resource_step(next_resource, fb));
}
}
Expand Down Expand Up @@ -113,7 +113,7 @@ proof fn lemma_from_after_get_resource_step_and_key_not_exists_to_resource_match
&&& pending_req_in_flight_at_after_get_resource_step(sub_resource, fb)(s)
}).leads_to(lift_state(sub_resource_state_matches(sub_resource, fb)))
),
next_resource_get_step_and_request(fb, sub_resource).0 == after_get_k_request_step(next_resource) ==> spec.entails(
next_resource_after(sub_resource) == after_get_k_request_step(next_resource) ==> spec.entails(
lift_state(|s: FBCluster| {
&&& !s.resources().contains_key(get_request(sub_resource, fb).key)
&&& pending_req_in_flight_at_after_get_resource_step(sub_resource, fb)(s)
Expand Down Expand Up @@ -219,7 +219,7 @@ proof fn lemma_from_after_get_resource_step_and_key_not_exists_to_resource_match

// We already have the desired state.
// Now prove the system can successfully enter the next state.
if next_resource_get_step_and_request(fb, sub_resource).0 == after_get_k_request_step(next_resource) {
if next_resource_after(sub_resource) == after_get_k_request_step(next_resource) {
let known_ok_resp = |resp_msg: FBMessage| lift_state(resp_msg_is_the_in_flight_ok_resp_at_after_create_resource_step(sub_resource, fb, resp_msg));
let next_state = pending_req_in_flight_at_after_get_resource_step(next_resource, fb);
assert forall |resp_msg| spec.entails(#[trigger] known_ok_resp(resp_msg).leads_to(lift_state(next_state))) by {
Expand Down Expand Up @@ -272,7 +272,7 @@ proof fn lemma_from_after_get_resource_step_and_key_not_exists_to_resource_match
&&& #[trigger] ex.head().in_flight().contains(resp_msg)
&&& Message::resp_msg_matches_req_msg(resp_msg, ex.head().ongoing_reconciles()[fb.object_ref()].pending_req_msg.get_Some_0())
&&& resp_msg.content.get_create_response().res.is_Ok()
&&& state_after_create_or_update(sub_resource, resp_msg.content.get_create_response().res.get_Ok_0(), ex.head().ongoing_reconciles()[fb.object_ref()].local_state).is_Ok()
&&& state_after_create(sub_resource, fb, resp_msg.content.get_create_response().res.get_Ok_0(), ex.head().ongoing_reconciles()[fb.object_ref()].local_state).is_Ok()
};
assert(known_ok_resp(resp_msg).satisfied_by(ex));
}
Expand Down Expand Up @@ -312,7 +312,7 @@ proof fn lemma_from_after_get_resource_step_and_key_exists_to_resource_matches(
&&& pending_req_in_flight_at_after_get_resource_step(sub_resource, fb)(s)
}).leads_to(lift_state(sub_resource_state_matches(sub_resource, fb)))
),
next_resource_get_step_and_request(fb, sub_resource).0 == after_get_k_request_step(next_resource) ==> spec.entails(
next_resource_after(sub_resource) == after_get_k_request_step(next_resource) ==> spec.entails(
lift_state(|s: FBCluster| {
&&& s.resources().contains_key(get_request(sub_resource, fb).key)
&&& pending_req_in_flight_at_after_get_resource_step(sub_resource, fb)(s)
Expand Down Expand Up @@ -414,7 +414,7 @@ proof fn lemma_from_after_get_resource_step_and_key_exists_to_resource_matches(

// We already have the desired state.
// Now prove the system can successfully enter the next state.
if next_resource_get_step_and_request(fb, sub_resource).0 == after_get_k_request_step(next_resource) {
if next_resource_after(sub_resource) == after_get_k_request_step(next_resource) {
let known_ok_resp = |resp_msg: FBMessage| lift_state(resp_msg_is_the_in_flight_ok_resp_at_after_update_resource_step(sub_resource, fb, resp_msg));
let next_state = pending_req_in_flight_at_after_get_resource_step(next_resource, fb);
assert forall |resp_msg| spec.entails(#[trigger] known_ok_resp(resp_msg).leads_to(lift_state(next_state))) by {
Expand Down Expand Up @@ -467,7 +467,7 @@ proof fn lemma_from_after_get_resource_step_and_key_exists_to_resource_matches(
&&& #[trigger] ex.head().in_flight().contains(resp_msg)
&&& Message::resp_msg_matches_req_msg(resp_msg, ex.head().ongoing_reconciles()[fb.object_ref()].pending_req_msg.get_Some_0())
&&& resp_msg.content.get_update_response().res.is_Ok()
&&& state_after_create_or_update(sub_resource, resp_msg.content.get_update_response().res.get_Ok_0(), ex.head().ongoing_reconciles()[fb.object_ref()].local_state).is_Ok()
&&& state_after_update(sub_resource, fb, resp_msg.content.get_update_response().res.get_Ok_0(), ex.head().ongoing_reconciles()[fb.object_ref()].local_state).is_Ok()
};
assert(known_ok_resp(resp_msg).satisfied_by(ex));
}
Expand Down
Loading

0 comments on commit 77b3530

Please sign in to comment.