diff --git a/admyral/cli/workflow.py b/admyral/cli/workflow.py index 0ee30290..f4c5accf 100644 --- a/admyral/cli/workflow.py +++ b/admyral/cli/workflow.py @@ -3,7 +3,6 @@ import json from admyral.cli.cli import cli -from admyral.compiler.workflow_compiler import WorkflowCompiler from admyral.models import TriggerStatus from admyral.client import AdmyralClient from admyral.utils.telemetry import capture @@ -14,17 +13,13 @@ def workflow() -> None: """Workflow Management""" -# TODO: add option for pushing all used Python actions automatically @workflow.command( "push", help="Push a workflow to Admyral", ) -@click.argument("workflow_name", type=str) -@click.option( - "--file", - "-f", +@click.argument( + "file", type=str, - help="Path to the Python file containing the workflow", ) @click.option( "--activate", @@ -32,7 +27,7 @@ def workflow() -> None: help="Activate the workflow after pushing it to Admyral", ) @click.pass_context -def push(ctx: click.Context, workflow_name: str, file: str, activate: bool) -> None: +def push(ctx: click.Context, file: str, activate: bool) -> None: """Push workflow to Admyral""" capture(event_name="workflow:push") client: AdmyralClient = ctx.obj @@ -43,19 +38,18 @@ def push(ctx: click.Context, workflow_name: str, file: str, activate: bool) -> N return with open(file, "r") as f: workflow_code = f.read() - workflow_dag = WorkflowCompiler().compile_from_module(workflow_code, workflow_name) # Push workflow to Admyral try: workflow_push_response = client.push_workflow( - workflow_name=workflow_name, workflow_dag=workflow_dag, is_active=activate + workflow_code=workflow_code, is_active=activate ) except Exception as e: - click.echo(f"Failed to push workflow {workflow_name}.") + click.echo(f"Failed to push workflow from {file}.") click.echo(f"Error: {e}") return - click.echo(f"Workflow {workflow_name} pushed successfully.") + click.echo("Workflow pushed successfully.") if workflow_push_response.webhook_id: click.echo(f"Webhook ID: {workflow_push_response.webhook_id}") diff --git a/admyral/client.py b/admyral/client.py index b968af87..ff46dee3 100644 --- a/admyral/client.py +++ b/admyral/client.py @@ -3,7 +3,6 @@ from admyral.models import ( Workflow, - WorkflowDAG, PythonAction, WorkflowPushResponse, Secret, @@ -98,7 +97,7 @@ def get_workflow(self, workflow_name: str) -> Workflow | None: return Workflow.model_validate(result) if result else None def push_workflow( - self, workflow_name: str, workflow_dag: WorkflowDAG, is_active: bool = False + self, workflow_code: str, is_active: bool = False ) -> WorkflowPushResponse: """ Pushes the workflow to the server. @@ -111,9 +110,9 @@ def push_workflow( if the workflow has webhook enabled. """ response = self._post( - f"{API_V1_STR}/workflows/{workflow_name}/push", + f"{API_V1_STR}/workflows/push", WorkflowPushRequest( - workflow_dag=workflow_dag, + workflow_code=workflow_code, activate=is_active, ).model_dump(), ) diff --git a/admyral/compiler/workflow_compiler.py b/admyral/compiler/workflow_compiler.py index 045e71ad..f69adeb6 100644 --- a/admyral/compiler/workflow_compiler.py +++ b/admyral/compiler/workflow_compiler.py @@ -1,17 +1,9 @@ import ast -import inspect -from typing import TypeVar, Callable, Any, TYPE_CHECKING +from typing import TYPE_CHECKING import astor -from collections import deque, defaultdict -from copy import deepcopy -from enum import Enum -import tempfile -from admyral.action_registry import ActionRegistry from admyral.typings import JsonValue from admyral.models import ( - NodeBase, - ActionNode, UnaryOperator, BinaryOperator, Condition, @@ -20,395 +12,30 @@ BinaryConditionExpression, AndConditionExpression, OrConditionExpression, - IfNode, - WorkflowDAG, - WorkflowStart, - WorkflowWebhookTrigger, - WorkflowScheduleTrigger, - WorkflowDefaultArgument, ) -from admyral.compiler.condition_compiler import condition_to_str if TYPE_CHECKING: - from admyral.workflow import Workflow - - -F = TypeVar("F", bound=Callable[..., Any]) - - -START_NODE_ID: str = "start" - - -class EdgeType(str, Enum): - DEFAULT = "default" - TRUE = "true" - FALSE = "false" + pass # TODO: Introduce WorkflowCompileError class WorkflowCompiler: - def __init__(self) -> None: - self.variable_to_nodes_mapping: dict[str, list[tuple[str, EdgeType]]] = ( - defaultdict(list) - ) - self.nodes: dict[str, NodeBase] = {} - - def compile(self, workflow: "Workflow") -> WorkflowDAG: - if inspect.iscoroutinefunction(workflow.func): - raise RuntimeError("Async workflows are not yet supported.") - - tree = ast.parse(inspect.getsource(workflow.func)) - function_node = tree.body[0] - - start_node = ActionNode.build_start_node() - self.nodes[START_NODE_ID] = start_node - - self._check_function_header_for_payload_parameter(workflow.func) - self.variable_to_nodes_mapping["payload"] = [(START_NODE_ID, EdgeType.DEFAULT)] - # Compile the workflow - self._compile_body(function_node.body) - - return self._into_dag(workflow) - - def compile_from_module(self, module_code: str, workflow_name: str) -> WorkflowDAG: - with tempfile.NamedTemporaryFile() as tmp: - tmp.write(module_code.encode()) - tmp.seek(0) - compiled_code = compile(module_code, tmp.name, "exec") - module_namespace = {} - exec(compiled_code, module_namespace) - return module_namespace[workflow_name].compile() - def compile_if_condition_str(self, condition: str) -> Condition: tree = ast.parse(condition) if len(tree.body) != 1: raise ValueError("Invalid condition string.") - cond, _ = self._compile_condition_expression(tree.body[0].value) - return cond - - def _handle_triggers( - self, workflow: "Workflow" - ) -> list[WorkflowWebhookTrigger | WorkflowScheduleTrigger]: - from admyral.workflow import Webhook, Schedule - - contains_webhook = False - triggers = [] - for trigger in workflow.triggers: - default_args = [ - WorkflowDefaultArgument(name=k, value=v) - for k, v in trigger.default_args.items() - ] - - if isinstance(trigger, Webhook): - if contains_webhook: - raise ValueError("A workflow can only have one webhook trigger.") - contains_webhook = True - triggers.append(WorkflowWebhookTrigger(default_args=default_args)) - continue - - if isinstance(trigger, Schedule): - if trigger.cron: - triggers.append( - WorkflowScheduleTrigger( - cron=trigger.cron, default_args=default_args - ) - ) - continue - - if trigger.interval_seconds: - triggers.append( - WorkflowScheduleTrigger( - interval_seconds=trigger.interval_seconds, - default_args=default_args, - ) - ) - continue - - if trigger.interval_minutes: - triggers.append( - WorkflowScheduleTrigger( - interval_minutes=trigger.interval_minutes, - default_args=default_args, - ) - ) - continue - - if trigger.interval_hours: - triggers.append( - WorkflowScheduleTrigger( - interval_hours=trigger.interval_hours, - default_args=default_args, - ) - ) - continue - - if trigger.interval_days: - triggers.append( - WorkflowScheduleTrigger( - interval_days=trigger.interval_days, - default_args=default_args, - ) - ) - continue - - return triggers - - def _into_dag(self, workflow: "Workflow") -> WorkflowDAG: - return WorkflowDAG( - name=workflow.name, - description=workflow.description, - controls=self._filter_empty_controls(workflow.controls), - start=WorkflowStart( - triggers=self._handle_triggers(workflow), - ), - dag=self.nodes, - ) - - def _filter_empty_controls(self, controls: list[str] | None) -> list[str] | None: - if not controls: - return None - return [control for control in controls if control] - - def _compute_unique_node_id(self, base: str) -> str: - if base not in self.nodes: - return base - for idx in range(1, 1_000): - id = f"{base}_{idx}" - if id not in self.nodes: - return id - raise RuntimeError(f"Could not find a unique id for node {base}") - - def _check_function_header_for_payload_parameter(self, function: F) -> None: - workflow_args = inspect.signature(function).parameters - if len(workflow_args) != 1 or "payload" not in workflow_args: - raise RuntimeError( - f'Failed to compile workflow "{function.__name__}" because the workflow function must have exactly one parameter called "payload" and no default value: payload: dict[str, JsonValue]' - ) - - arg_name = workflow_args["payload"].name - annotation = str(workflow_args["payload"].annotation) - default = workflow_args["payload"].default - - if ( - arg_name != "payload" - or annotation != "dict[str, JsonValue]" - or default != inspect.Parameter.empty - ): - raise RuntimeError( - f'Failed to compile workflow "{function.__name__}" because the workflow function must have exactly one parameter called "payload" and no default value: payload: dict[str, JsonValue]' - ) - - def _connect_node_to_dependencies( - self, - node_id: str, - dependencies: set[str], - variable_to_nodes_mapping: dict[str, list[tuple[str, EdgeType]]], - ) -> None: - for dependency_variable in dependencies: - for parent_node_id, edge_type in variable_to_nodes_mapping[ - dependency_variable - ]: - match edge_type: - case EdgeType.DEFAULT: - self.nodes[parent_node_id].add_edge(node_id) - case EdgeType.TRUE: - self.nodes[parent_node_id].add_true_edge(node_id) - case EdgeType.FALSE: - self.nodes[parent_node_id].add_false_edge(node_id) - - def _compile_body(self, body: list[ast.stmt]) -> None: - for statement in body: - if isinstance(statement, ast.Assign): - self._compile_assign(statement) - - elif isinstance(statement, ast.If): - if_node_id, emitted_variables, leaf_nodes, if_block_dependencies = ( - self._compile_if(statement, self.variable_to_nodes_mapping) - ) - - # Connect the if-condition node to the dependencies of the if-block - if len(if_block_dependencies) > 0: - self._connect_node_to_dependencies( - if_node_id, - if_block_dependencies, - self.variable_to_nodes_mapping, - ) - else: - # No dependencies, add edge from start node - self.nodes[0].add_edge(if_node_id) - - # If an action depends on a variable emitted by the if-statement, then we - # need to conect the action to all the leaf nodes to guarantee for - # correct execution order because the entire if-condition block must be executed - # before the action. - for emitted_variable in emitted_variables: - self.variable_to_nodes_mapping[emitted_variable] = leaf_nodes - - elif isinstance(statement, ast.Expr): - self._compile_expr(statement) - - else: - raise RuntimeError( - f"Unsupported statement in workflow function: {astor.to_source(statement)}" - ) - - def _is_descendant(self, node: str, target: str) -> bool: - """ - Does a path from node to target exist in the DAG? - """ - stack = [node] - while len(stack) > 0: - current = stack.pop() - if current == target: - return True - if isinstance(self.nodes[current], IfNode): - stack.extend(self.nodes[current].true_children) - stack.extend(self.nodes[current].false_children) - else: - stack.extend(self.nodes[current].children) - return False + return self._compile_condition_expression(tree.body[0].value) - def _remove_transitive_dependencies( - self, - dependencies: set[str], - variable_to_nodes_mapping: dict[str, list[tuple[str, EdgeType]]], - ) -> set[str]: - """ - Suppose we have an action A which has dependencies on B and C while B has a dependency on C. - - C -- - | | - v | - B | - | | - v | - A <- - - Due to the transitive nature of dependencies, we can remove C from the dependencies of A - because due to the dependency on B, A already has an implicit dependency on C. - - C --> B --> A - """ - removal_set = set() - for var1 in dependencies: - # Why is [0] sufficient? - # Case 1: if var1 is emitted by an action, then len(variable_to_nodes_mapping[var1]) == 1 - # Case 2: if var1 is emitted by an if-condition, then len(variable_to_nodes_mapping[var1]) > 1 - # because we remember the leaf nodes of the if-condition block as representative for var1. - # If an action depends on var1, then it must depend on all the leaf nodes - # and, therefore, connects to all of the leaf nodes. Hence, doing one hop starting from any leaf node - # will lead to the same action nodes. Consequently, we can just use [0] to get the first representative. - ancestor_node_id, _ = variable_to_nodes_mapping[var1][0] - for var2 in dependencies: - descendant_node_id, _ = variable_to_nodes_mapping[var2][0] - if ancestor_node_id == descendant_node_id: - continue - - if self._is_descendant(ancestor_node_id, descendant_node_id): - # if var1 is a descendant of var2, then we can remove var1 because - # dependencies are transitive - removal_set.add(var1) - - return dependencies - removal_set - - def _compile_call( - self, - call: ast.Call, - variable_to_nodes_mapping: dict[str, list[tuple[str, EdgeType]]], - ) -> tuple[str, set[str]]: - function_name = call.func.id - function_args = call.args - function_kwargs = call.keywords - - if len(function_args) > 0: - raise RuntimeError( - f"Failed to compile {function_name} because positional arguments are not supported. Please use named arguments instead." - ) - - # Check if function_name is a registered action - if not ActionRegistry.is_registered(function_name): - raise RuntimeError( - f'Function "{function_name}" is not an action. Please register the function as an action if you want to use it in the workflow.' - ) - - action_secrets_mapping = {} - dependencies = set() - args = {} - - processed_kwargs = set() - for kwarg in function_kwargs: - # check for duplicate named arguments - if kwarg.arg in processed_kwargs: - raise RuntimeError( - f"Failed to compile {function_name} due to duplicate named argument: {kwarg.arg}" - ) - processed_kwargs.add(kwarg.arg) - - if kwarg.arg == "run_after": - # add the dependencies enforced by the run_after argument - run_after = kwarg.value.elts - for dependency in run_after: - if dependency.id not in variable_to_nodes_mapping: - raise RuntimeError( - f"Failed to compile {function_name} due to unknown dependency: {dependency.id}" - ) - dependencies.add(dependency.id) - continue - - if kwarg.arg == "secrets": - # extract the secrets mapping from the secrets argument - action_secrets_mapping = { - secret_placeholder.value: secret_id.value - for secret_placeholder, secret_id in zip( - kwarg.value.keys, kwarg.value.values - ) - } - continue - - # collect dependencies and construct input args - arg_name = kwarg.arg - arg_value = kwarg.value - new_dependencies, args[arg_name] = self._compile_value(arg_value) - dependencies |= new_dependencies - - # check that for each placeholder a secret is specified - registered_action = ActionRegistry.get(function_name) - if len(action_secrets_mapping) != len(registered_action.secrets_placeholders): - raise RuntimeError( - f"Failed to compile {function_name} because provided secrets do not match secrets placeholders." - ) - for secret_placeholder in registered_action.secrets_placeholders: - if secret_placeholder not in action_secrets_mapping: - raise RuntimeError( - f"Failed to compile {function_name} because for placeholder {secret_placeholder} no secret is defined." - ) - - # Remove transitive dependencies to avoid redundant edges in the DAG. - dependencies = self._remove_transitive_dependencies( - dependencies, variable_to_nodes_mapping - ) - - node_id = self._compute_unique_node_id(function_name) - node = ActionNode( - id=node_id, - type=function_name, - args=args, - secrets_mapping=action_secrets_mapping, - ) - self.nodes[node_id] = node - - return node_id, dependencies - - def _compile_value(self, expr: ast.expr) -> tuple[set[str], JsonValue]: + def _compile_value(self, expr: ast.expr) -> JsonValue: if isinstance(expr, ast.Name): - return {expr.id}, f"{{{{ {expr.id} }}}}" + return f"{{{{ {expr.id} }}}}" if isinstance(expr, ast.Subscript): variable_id, access_path = self._compile_subscript(expr) - return {variable_id}, access_path + return access_path if isinstance(expr, ast.Constant): - return set(), expr.value + return expr.value if isinstance(expr, ast.List): return self._compile_list(expr) @@ -416,27 +43,21 @@ def _compile_value(self, expr: ast.expr) -> tuple[set[str], JsonValue]: if isinstance(expr, ast.Dict): return self._compile_dict(expr) - if isinstance(expr, ast.JoinedStr): - return self._compile_joined_str(expr) - raise RuntimeError( f"Failed to compile {astor.to_source(expr)}. Unsupported argument type." ) - def _compile_list(self, list_expr: ast.List) -> tuple[set[str], JsonValue]: + def _compile_list(self, list_expr: ast.List) -> JsonValue: compiled_json_array = [] - dependencies = set() for value in list_expr.elts: deps, compiled_value = self._compile_value(value) - dependencies |= deps compiled_json_array.append(compiled_value) - return dependencies, compiled_json_array + return compiled_json_array - def _compile_dict(self, dict_expr: ast.Dict) -> tuple[set[str], JsonValue]: + def _compile_dict(self, dict_expr: ast.Dict) -> JsonValue: compiled_json_object = {} - dependencies = set() for key, value in zip(dict_expr.keys, dict_expr.values): if not isinstance(key, (ast.Constant, ast.Name, ast.Subscript)): @@ -447,113 +68,9 @@ def _compile_dict(self, dict_expr: ast.Dict) -> tuple[set[str], JsonValue]: deps_key, compiled_key = self._compile_value(key) deps_value, compiled_value = self._compile_value(value) - dependencies |= deps_key - dependencies |= deps_value - compiled_json_object[compiled_key] = compiled_value - return dependencies, compiled_json_object - - def _compile_joined_str(self, joined_str: ast.JoinedStr) -> tuple[set[str], str]: - dependencies = set() - output = "" - - for value in joined_str.values: - if isinstance(value, ast.Constant): - output += value.value - continue - - if isinstance(value, ast.FormattedValue): - if value.conversion != -1: - raise RuntimeError( - f"Failed to compile {astor.to_source(joined_str)}Conversions are not supported." - ) - - if isinstance(value.value, ast.Name): - dependencies.add(value.value.id) - output += f"{{{{ {value.value.id} }}}}" - continue - - if isinstance(value.value, ast.Subscript): - variable_id, access_path = self._compile_subscript(value.value) - dependencies.add(variable_id) - output += access_path - continue - - raise RuntimeError( - f"Failed to compile {astor.to_source(joined_str)}Only variables and subscripts are supported in f-strings." - ) - - raise RuntimeError( - f"Failed to compile {astor.to_source(joined_str)}Unsupported value in f-string." - ) - - return dependencies, output - - def _get_assign_target(self, assign: ast.Assign) -> str: - # => we only support a single target - if len(assign.targets) != 1: - raise RuntimeError( - f"Failed to compile {astor.to_source(assign)}Only single target assignments are supported." - ) - target = assign.targets[0] - if not isinstance(target, ast.Name): - raise RuntimeError( - f"Failed to compile {astor.to_source(assign)}Only single target assignments are supported." - ) - return target.id - - def _compile_assign(self, assign: ast.Assign) -> None: - # Expect: NAME = CALL - - # Handle lhs: NAME - target_variable_id = self._get_assign_target(assign) - - # we might have an await statement. unwrap it - if isinstance(assign.value, ast.Await): - func_call = assign.value.value - else: - func_call = assign.value - - if not isinstance(func_call, ast.Call): - raise RuntimeError( - f"Failed to compile {astor.to_source(assign)}Assignments to variables are only supported for function calls." - ) - - # Compile Action Call - - action_node_id, dependencies = self._compile_call( - func_call, self.variable_to_nodes_mapping - ) - - # We must also consider the target variable as a dependency. - # Consider the following workflow: - # a = act1() - # a = act2() - # b = act3(a) - # - # If we would not add the target variable as a dependency, then act1 and act2 would be executed in parallel. - # This would lead to a race condition because act1 could be slower than act2 and overwrite its result. - # Hence, act3 would use the result of act1 instead of act2. - if target_variable_id in self.variable_to_nodes_mapping: - dependencies.add(target_variable_id) - - # Dependencies must be executed before the current action. Hence, add edges from dependencies to the current node. - if len(dependencies) > 0: - self._connect_node_to_dependencies( - action_node_id, dependencies, self.variable_to_nodes_mapping - ) - else: - # No dependencies, add edge from start node - self.nodes[START_NODE_ID].add_edge(action_node_id) - - self.nodes[action_node_id].result_name = target_variable_id - - # Remember the node for the target variable - # If future actions use target_variable_id then they depend on this node - self.variable_to_nodes_mapping[target_variable_id] = [ - (action_node_id, EdgeType.DEFAULT) - ] + return compiled_json_object def _compile_subscript(self, subscript: ast.Subscript) -> tuple[str, str]: """ @@ -582,19 +99,17 @@ def _compile_subscript(self, subscript: ast.Subscript) -> tuple[str, str]: def _compile_condition_expression( self, condition_expr: ast.expr, - ) -> tuple[Condition, set[str]]: + ) -> Condition: # AND / OR if isinstance(condition_expr, ast.BoolOp): - deps = set() compiled_values = [] for value in condition_expr.values: - expr, new_deps = self._compile_condition_expression(value) - deps |= new_deps + expr = self._compile_condition_expression(value) compiled_values.append(expr) if isinstance(condition_expr.op, ast.And): - return AndConditionExpression(and_expr=compiled_values), deps + return AndConditionExpression(and_expr=compiled_values) if isinstance(condition_expr.op, ast.Or): - return OrConditionExpression(or_expr=compiled_values), deps + return OrConditionExpression(or_expr=compiled_values) raise RuntimeError( f"Unsupported boolean operator: {astor.to_source(condition_expr)}" ) @@ -606,7 +121,7 @@ def _compile_condition_expression( f"Failed to compile {astor.to_source(condition_expr)}Only single comparison operations are supported." ) - lhs, deps_lhs = self._compile_condition_expression(condition_expr.left) + lhs = self._compile_condition_expression(condition_expr.left) # LHS is not None / LHS is None if isinstance(condition_expr.ops[0], (ast.Is, ast.IsNot)): @@ -615,11 +130,9 @@ def _compile_condition_expression( if isinstance(condition_expr.ops[0], ast.Is) else UnaryOperator.IS_NOT_NONE ) - return UnaryConditionExpression(op=op, expr=lhs), deps_lhs + return UnaryConditionExpression(op=op, expr=lhs) - rhs, deps_rhs = self._compile_condition_expression( - condition_expr.comparators[0] - ) + rhs = self._compile_condition_expression(condition_expr.comparators[0]) op_node = condition_expr.ops[0] if isinstance(op_node, ast.Eq): @@ -643,9 +156,7 @@ def _compile_condition_expression( f"Unsupported comparison operator: {astor.to_source(op_node)}" ) - return BinaryConditionExpression( - lhs=lhs, op=op, rhs=rhs - ), deps_lhs | deps_rhs + return BinaryConditionExpression(lhs=lhs, op=op, rhs=rhs) # NOT if isinstance(condition_expr, ast.UnaryOp): @@ -655,305 +166,27 @@ def _compile_condition_expression( raise RuntimeError( f"Unsupported unary operator: {astor.to_source(condition_expr.op)}" ) - expr, deps = self._compile_condition_expression(condition_expr.operand) - return UnaryConditionExpression(op=op, expr=expr), deps + expr = self._compile_condition_expression(condition_expr.operand) + return UnaryConditionExpression(op=op, expr=expr) # VARIABLE if isinstance(condition_expr, ast.Name): - return ConstantConditionExpression( - value=f"{{{{ {condition_expr.id} }}}}" - ), {condition_expr.id} + return ConstantConditionExpression(value=f"{{{{ {condition_expr.id} }}}}") # JSON CONSTANT if isinstance(condition_expr, ast.Constant): - return ConstantConditionExpression(value=condition_expr.value), set() + return ConstantConditionExpression(value=condition_expr.value) # SUBSCRIPT/REFERENCE if isinstance(condition_expr, ast.Subscript): variable_id, access_path = self._compile_subscript(condition_expr) - return ConstantConditionExpression(value=access_path), {variable_id} + return ConstantConditionExpression(value=access_path) # LIST / DICT if isinstance(condition_expr, (ast.List, ast.Dict)): - deps, value = self._compile_value(condition_expr) - return ConstantConditionExpression(value=value), deps + value = self._compile_value(condition_expr) + return ConstantConditionExpression(value=value) raise RuntimeError( f"Unsupported condition expression: {astor.to_source(condition_expr)}" ) - - def _collect_leaves_and_emitted_variables( - self, if_node_id: str - ) -> tuple[list[tuple[str, EdgeType]], set[str]]: - # Perform BFS to collect all the leaf nodes and emitted variables - # NOTE: leaf nodes means that we collect the last action node of each possible branch within the if-condition block - - leaf_nodes = set() - emitted_variables = set() - - queue = deque([if_node_id]) - - while len(queue) > 0: - current_node_id = queue.popleft() - current_node = self.nodes[current_node_id] - - if isinstance(current_node, ActionNode): - if current_node.result_name is not None: - emitted_variables.add(current_node.result_name) - - if len(current_node.children) == 0: - # leaf node - leaf_nodes.add((current_node_id, EdgeType.DEFAULT)) - else: - queue.extend(current_node.children) - - else: - # IfNode - if len(current_node.false_children) > 0: - queue.extend(current_node.false_children) - else: - # if the branch is empty, we still add (if_node_if, EdgeType.TRUE) to the leaf nodes - # because if an action has a dependency on the if-condition, then it must always be - # executed after the if-condition block. - # - # Consider the following example: - # a = act1() - # if a > 0: - # a = act2() - # b = act3(a) - # - # If we would not add (if_node_id, EdgeType.FALSE) to the leaf nodes, then graph would look like: - # - # - # | - # a = act1() - - # | | - # if a > 0 | - # T| | - # a = act2() | - # \ / - # b = act3(a) - # - # This would mean that we would need to keep a longer history of the variables, so that we can - # connect act3(a) to a = act1() - # But if we add (if_node_id, EdgeType.FALSE) to the leaf nodes, then the graph would look like: - # - # - # | - # a = act1() - # | - # if a > 0 - # T/ \F - # a = act2() | - # \ | - # b = act3(a) - # - # This way, we can connect act3(a) to if a > 0, which is much simpler since we just remember - # the leaves from the if-condition block as a representative for a. - leaf_nodes.add((current_node_id, EdgeType.FALSE)) - - if len(current_node.true_children) > 0: - queue.extend(current_node.true_children) - else: - # Same as for the false branch - leaf_nodes.add((current_node_id, EdgeType.TRUE)) - - return list(leaf_nodes), emitted_variables - - def _compile_if_branch( - self, - statements: list[ast.stmt], - variable_to_nodes_mapping: dict[str, list[tuple[str, EdgeType]]], - add_edge_to_if_node: Callable[..., None], - ) -> set[str]: - if_block_dependencies = set() - - if_branch_emitted_variables = set() - local_variable_to_nodes_mapping = deepcopy(variable_to_nodes_mapping) - - for statement in statements: - if isinstance(statement, (ast.Assign, ast.Expr)): - # Expected: NAME = CALL or CALL - - if isinstance(statement.value, ast.Constant): - # Ignore constants. Example: multi-line comments - continue - - # we might have an await statement. unwrap it - if isinstance(statement.value, ast.Await): - # TODO: enable async workflows later - # func_call = statement.value.value - raise RuntimeError( - f"Failed to compile {astor.to_source(statement)}Async functions are not yet supported." - ) - else: - func_call = statement.value - - if not isinstance(func_call, ast.Call): - raise RuntimeError( - f"Failed to compile {astor.to_source(func_call)}Unsupported expression." - ) - - # Compile Action Call - action_node_id, dependencies = self._compile_call( - func_call, local_variable_to_nodes_mapping - ) - - # Remember the dependencies from before the if-condition - # => required for the edges to the if-condition node - if_block_dependencies |= dependencies - if_branch_emitted_variables - - # For inside the branch, we only need to consider the dependencies that were - # emitted within the branch - dependencies = dependencies & if_branch_emitted_variables - - if len(dependencies) > 0: - self._connect_node_to_dependencies( - action_node_id, dependencies, local_variable_to_nodes_mapping - ) - else: - # No new dependencies from inside the branch, add edge from if-condition node - add_edge_to_if_node(action_node_id) - - # Handle assignment target - # only required for NAME = CALL - if isinstance(statement, ast.Assign): - target_variable_id = self._get_assign_target(statement) - self.nodes[action_node_id].result_name = target_variable_id - local_variable_to_nodes_mapping[target_variable_id] = [ - (action_node_id, EdgeType.DEFAULT) - ] - if_branch_emitted_variables.add( - self.nodes[action_node_id].result_name - ) - - elif isinstance(statement, ast.If): - # Nested If-Condition - - ( - nested_if_cond_node_id, - nested_if_emitted_variables, - nested_if_leaf_nodes, - nested_if_block_deps, - ) = self._compile_if(statement, local_variable_to_nodes_mapping) - - # Connect the nested if-condition node to the dependencies of the nested if-condition block - dependencies = if_branch_emitted_variables & nested_if_block_deps - if len(dependencies) > 0: - self._connect_node_to_dependencies( - nested_if_cond_node_id, - dependencies, - local_variable_to_nodes_mapping, - ) - else: - # No new dependencies from inside the branch, add direct edge from if-condition node to nested if-condition node - add_edge_to_if_node(nested_if_cond_node_id) - - # update local_variable_to_nodes_mapping - # => the emitted variables are represented by the leaves of the nested if-condition block - for emitted_variable in nested_if_emitted_variables: - local_variable_to_nodes_mapping[emitted_variable] = ( - nested_if_leaf_nodes - ) - - # push the emitted variables to the if_branch_emitted_variables because - # now the if-branch also emits the variables from the nested if-condition block - if_branch_emitted_variables |= nested_if_emitted_variables - - else: - raise RuntimeError( - f"Unsupported statement in workflow function: {astor.to_source(statement)}" - ) - - return if_block_dependencies - - def _compile_if( - self, - if_ast: ast.If, - variable_to_nodes_mapping: dict[str, list[tuple[str, EdgeType]]], - ) -> tuple[int, set[str], list[tuple[str, EdgeType]], set[str]]: - prev_variables_to_nodes_mapping = deepcopy(variable_to_nodes_mapping) - - # compile condition + collect dependencies from the condition - if_condition, if_block_dependencies = self._compile_condition_expression( - if_ast.test - ) - if_condition_str = condition_to_str(if_ast.test) - - if_node_id = self._compute_unique_node_id("if") - if_node = IfNode( - id=if_node_id, condition=if_condition, condition_str=if_condition_str - ) - self.nodes[if_node_id] = if_node - - # Compile true branch - if_block_dependencies |= self._compile_if_branch( - if_ast.body, variable_to_nodes_mapping, if_node.add_true_edge - ) - # Compile false branch - if_block_dependencies |= self._compile_if_branch( - if_ast.orelse, variable_to_nodes_mapping, if_node.add_false_edge - ) - - # remove transitive dependencies from if_block_dependencies - - # We need to remove transitive dependencies from the statements before the if-block to avoid - # connectng the if-block to redundant edges in the DAG. - # - # Consider the following example: - # a = act1() - # b = act2(a) - # if a > 0 and b < 0: - # ... - # - # Then, a and b are both dependencies for the if-block. But since b depends on a, we can - # remove the edge from a to the if-condition node because the if-condition node already - # implicilty depends on a due to the dependency on b. - # - # NOTE: we need to use the initial variable_to_nodes_mapping because nested if-conditions - # might have overwritten the variable_to_nodes_mapping. - if_block_dependencies = self._remove_transitive_dependencies( - if_block_dependencies, prev_variables_to_nodes_mapping - ) - - # Collect leaf nodes and emitted variables from if-condition block - leaf_nodes, emitted_variables = self._collect_leaves_and_emitted_variables( - if_node_id - ) - - return if_node_id, emitted_variables, leaf_nodes, if_block_dependencies - - def _compile_expr(self, expr: ast.Expr) -> None: - # Expect: CALL - - if isinstance(expr.value, ast.Constant): - # Ignore constants. Example: multi-line comments - return - - # we might have an await statement. unwrap it - if isinstance(expr.value, ast.Await): - # TODO: enable async workflows later - # func_call = expr.value.value - raise RuntimeError( - f"Failed to compile {astor.to_source(expr)}Async functions are not yet supported." - ) - else: - func_call = expr.value - - if not isinstance(func_call, ast.Call): - raise RuntimeError( - f"Failed to compile {astor.to_source(func_call)}Unsupported expression." - ) - - # Compile Action Call - action_node_id, dependencies = self._compile_call( - func_call, self.variable_to_nodes_mapping - ) - # Dependencies must be executed before the current action. Hence, add edges from dependencies to the current node. - if len(dependencies) > 0: - self._connect_node_to_dependencies( - action_node_id, dependencies, self.variable_to_nodes_mapping - ) - else: - # No dependencies, add edge from start node - self.nodes[START_NODE_ID].add_edge(action_node_id) diff --git a/admyral/compiler/yaml_workflow_compiler.py b/admyral/compiler/yaml_workflow_compiler.py new file mode 100644 index 00000000..ae34365e --- /dev/null +++ b/admyral/compiler/yaml_workflow_compiler.py @@ -0,0 +1,154 @@ +from typing import Any +import yaml +import re + +from admyral.models import WorkflowDAG, WorkflowTriggerType, ActionNode +from admyral.action_registry import ActionRegistry +from admyral.db.store_interface import StoreInterface + + +SNAKE_CASE_REGEX = re.compile(r"^[a-z]+(_[a-z]+)*$") +VALID_WORKFLOW_NAME_REGEX = re.compile(r"^[a-zA-Z][a-zA-Z0-9 _]*$") + + +def has_cycle(workflow: WorkflowDAG) -> bool: + def dfs(node_id: str, visited: set[str]) -> bool: + if node_id in visited: + return True + visited.add(node_id) + node = workflow.dag[node_id] + children = ( + node.children + if isinstance(node, ActionNode) + else node.true_children + node.false_children + ) + for child_id in children: + if dfs(child_id, visited): + return True + visited.remove(node_id) + return False + + return dfs("start", set()) + + +async def validate_workflow( + user_id: str, db: StoreInterface, workflow: WorkflowDAG +) -> None: + if workflow.version != "1": + raise ValueError("Valid workflow schema versions: 1.") + + if not VALID_WORKFLOW_NAME_REGEX.match(workflow.name): + raise ValueError( + "Invalid workflow name. Workflow names must start with a letter and can only contain alphanumeric characters, underscores, and spaces.", + ) + + # verify that only one webhook trigger is present + if ( + len( + [ + trigger + for trigger in workflow.start.triggers + if trigger.type == WorkflowTriggerType.WEBHOOK + ] + ) + > 1 + ): + raise ValueError("At most one webhook trigger is allowed.") + + # validate schedule triggers: only one of cron, interval_seconds, interval_minutes, interval_hours, interval_days is set + if any( + len( + list( + filter( + lambda x: x is not None, + [ + trigger.cron, + trigger.interval_seconds, + trigger.interval_minutes, + trigger.interval_hours, + trigger.interval_days, + ], + ) + ) + ) + > 1 + for trigger in workflow.start.triggers + if trigger.type == WorkflowTriggerType.SCHEDULE + ): + raise ValueError( + "At most one schedule type (cron, interval seconds, etc.) per schedule trigger is allowed." + ) + + # workflow must have exactly one start node + start_nodes = [node for node in workflow.dag.values() if node.type == "start"] + if len(start_nodes) != 1: + raise ValueError("There must be exactly one start node.") + if start_nodes[0].id != "start": + raise ValueError("The start node must have id 'start'.") + if start_nodes[0].result_name != "payload": + raise ValueError("The start node must have result_name 'payload'.") + + # result name must be in snake case + if not all( + node.result_name is None + or node.result_name == "" + or SNAKE_CASE_REGEX.match(node.result_name) + for node in workflow.dag.values() + if isinstance(node, ActionNode) + ): + raise ValueError( + "If a result name is provided, then the result name must be in snake_case." + ) + + # check all node IDs are unique + node_id_and_dag_key_mismatches = [ + (dag_key, node.id) + for (dag_key, node) in workflow.dag.items() + if dag_key != node.id + ] + if len(node_id_and_dag_key_mismatches) > 0: + raise ValueError( + f"The following node IDs do not match their DAG keys: {node_id_and_dag_key_mismatches}" + ) + + # check that all children are valid node IDs + for node in workflow.dag.values(): + children = ( + node.children + if isinstance(node, ActionNode) + else node.true_children + node.false_children + ) + for child_id in children: + # start node must not have incoming edges + if child_id == "start": + raise ValueError("Start node cannot be a child of any node.") + if child_id not in workflow.dag: + raise ValueError(f"Child node ID '{child_id}' not found.") + + # check whether the action types are valid + for node in workflow.dag.values(): + if node.type == "start" or node.type == "if_condition": + continue + + # check action registry + if ActionRegistry.is_registered(node.type): + continue + + # check database for custom actions + if await db.get_action(user_id, node.type): + continue + + raise ValueError(f"Invalid action '{node.type}'.") + + # check for cycles + if has_cycle(workflow): + raise ValueError("Cycles are not allowed for workflows.") + + +def compile_from_yaml_workflow(yaml_workflow_str: str) -> WorkflowDAG: + yaml_workflow_dict = yaml.safe_load(yaml_workflow_str) + return WorkflowDAG.model_validate(yaml_workflow_dict) + + +def decompile_workflow_to_yaml(workflow_dag: WorkflowDAG) -> dict[str, Any]: + return workflow_dag.model_dump() diff --git a/admyral/models/workflow.py b/admyral/models/workflow.py index dd177e71..142d38c8 100644 --- a/admyral/models/workflow.py +++ b/admyral/models/workflow.py @@ -45,7 +45,7 @@ def __eq__(self, value: object) -> bool: @classmethod def build_start_node(cls) -> "ActionNode": - return cls(id="start", type="start") + return cls(id="start", type="start", result_name="payload") class IfNode(NodeBase): @@ -53,10 +53,6 @@ class IfNode(NodeBase): condition: Condition = Field(..., discriminator="type") condition_str: str - # Children must be sets - # Why? - # if we have act5(a, b) and a, b are emitted variables from an if-condition block - # then we connect each leaf node twice to act5(a, b) (once for a and once for b) true_children: list[str] = [] false_children: list[str] = [] @@ -99,7 +95,7 @@ class WorkflowDefaultArgument(BaseModel): class WorkflowTriggerBase(BaseModel): type: WorkflowTriggerType - default_args: list[WorkflowDefaultArgument] + default_args: list[WorkflowDefaultArgument] = [] @property def default_args_dict(self) -> dict[str, JsonValue]: @@ -156,7 +152,7 @@ class Workflow(BaseModel): class WorkflowPushRequest(BaseModel): - workflow_dag: WorkflowDAG + workflow_code: str activate: bool diff --git a/admyral/server/endpoints/editor_endpoints.py b/admyral/server/endpoints/editor_endpoints.py index 28c5c678..82691b2e 100644 --- a/admyral/server/endpoints/editor_endpoints.py +++ b/admyral/server/endpoints/editor_endpoints.py @@ -10,7 +10,6 @@ ActionNamespace, EditorActions, EditorWorkflowGraph, - WorkflowPushRequest, WorkflowPushResponse, ) from admyral.editor import ( @@ -19,6 +18,7 @@ ) from admyral.server.endpoints.workflow_endpoints import push_workflow_impl from admyral.server.auth import authenticate +from admyral.compiler.yaml_workflow_compiler import validate_workflow VALID_WORKFLOW_NAME_REGEX = re.compile(r"^[a-zA-Z][a-zA-Z0-9 _]*$") @@ -119,9 +119,8 @@ async def save_workflow_from_react_flow_graph( user_id=authenticated_user.user_id, workflow_name=workflow.workflow_name, workflow_id=workflow.workflow_id, - request=WorkflowPushRequest( - workflow_dag=workflow.workflow_dag, activate=workflow.is_active - ), + workflow_dag=workflow.workflow_dag, + activate=workflow.is_active, ) @@ -133,25 +132,21 @@ async def create_workflow_from_react_flow_graph( """ Create a new workflow from a ReactFlow graph. """ - if not VALID_WORKFLOW_NAME_REGEX.match(editor_workflow_graph.workflow_name): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid workflow name. Workflow names must start with a letter and can only contain alphanumeric characters, underscores, and spaces.", - ) + store = get_admyral_store() - if any( - not SNAKE_CASE_REGEX.match(node.result_name) - for node in editor_workflow_graph.nodes - if node.type == "action" and node.result_name is not None - ): + workflow = editor_workflow_graph_to_workflow(editor_workflow_graph) + try: + await validate_workflow( + authenticated_user.user_id, store, workflow.workflow_dag + ) + except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid node result name. Result names must start with a letter and can only contain alphanumeric characters and underscores.", - ) + detail=str(e), + ) from e - workflow = editor_workflow_graph_to_workflow(editor_workflow_graph) try: - await get_admyral_store().create_workflow( + await store.create_workflow( user_id=authenticated_user.user_id, workflow=workflow ) except ValueError as e: diff --git a/admyral/server/endpoints/workflow_endpoints.py b/admyral/server/endpoints/workflow_endpoints.py index c8197bb1..df2b4858 100644 --- a/admyral/server/endpoints/workflow_endpoints.py +++ b/admyral/server/endpoints/workflow_endpoints.py @@ -2,7 +2,6 @@ from typing import Optional, Annotated from uuid import uuid4 from pydantic import BaseModel -import re from admyral.utils.collections import is_not_empty from admyral.server.deps import get_admyral_store, get_workers_client @@ -15,20 +14,21 @@ WorkflowSchedule, WorkflowTriggerResponse, WorkflowMetadata, - ActionNode, + WorkflowDAG, ) from admyral.logger import get_logger from admyral.typings import JsonValue from admyral.server.auth import authenticate +from admyral.compiler.yaml_workflow_compiler import ( + compile_from_yaml_workflow, + validate_workflow, +) logger = get_logger(__name__) MANUAL_TRIGGER_SOURCE_NAME = "manual" -SPACE = " " -VALID_WORKFLOW_NAME_REGEX = re.compile(r"^[a-zA-Z][a-zA-Z0-9 _]*$") -SNAKE_CASE_REGEX = re.compile(r"^[a-zA-Z][a-zA-Z0-9_]*$") class WorkflowBody(BaseModel): @@ -60,7 +60,8 @@ async def push_workflow_impl( user_id: str, workflow_name: str, workflow_id: str | None, - request: WorkflowPushRequest, + workflow_dag: WorkflowDAG, + activate: bool, ) -> WorkflowPushResponse: """ Push a workflow to the store. If the workflow for the provided workflow id already exists, @@ -71,21 +72,13 @@ async def push_workflow_impl( workflow: The workflow object. """ - if not VALID_WORKFLOW_NAME_REGEX.match(workflow_name): - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid workflow name. Workflow names must start with a letter and can only contain alphanumeric characters, underscores, and spaces.", - ) - - if any( - not SNAKE_CASE_REGEX.match(node.result_name) - for node in request.workflow_dag.dag.values() - if isinstance(node, ActionNode) and node.result_name is not None - ): + try: + await validate_workflow(user_id, get_admyral_store(), workflow_dag) + except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Invalid node result name. Result names must be snake_case.", - ) + detail=str(e), + ) from e admyral_store = get_admyral_store() workers_client = get_workers_client() @@ -116,8 +109,8 @@ async def push_workflow_impl( workflow = Workflow( workflow_id=workflow_id, workflow_name=workflow_name, - workflow_dag=request.workflow_dag, - is_active=request.activate, + workflow_dag=workflow_dag, + is_active=activate, ) await admyral_store.store_workflow(user_id, workflow) @@ -134,7 +127,7 @@ async def push_workflow_impl( new_schedules = list( filter( lambda trigger: trigger.type == WorkflowTriggerType.SCHEDULE, - request.workflow_dag.start.triggers, + workflow_dag.start.triggers, ) ) if is_not_empty(new_schedules): @@ -151,7 +144,7 @@ async def push_workflow_impl( ) await admyral_store.store_schedule(user_id, new_workflow_schedule) - if request.activate: + if activate: await workers_client.schedule_workflow( user_id, workflow, new_workflow_schedule ) @@ -160,7 +153,7 @@ async def push_workflow_impl( filtered_webhooks = list( filter( lambda trigger: trigger.type == WorkflowTriggerType.WEBHOOK, - request.workflow_dag.start.triggers, + workflow_dag.start.triggers, ) ) if len(filtered_webhooks) > 1: @@ -192,9 +185,8 @@ async def push_workflow_impl( router = APIRouter() -@router.post("/{workflow_name}/push", status_code=status.HTTP_201_CREATED) +@router.post("/push", status_code=status.HTTP_201_CREATED) async def push_workflow( - workflow_name: str, request: WorkflowPushRequest, authenticated_user: AuthenticatedUser = Depends(authenticate), ) -> WorkflowPushResponse: @@ -206,8 +198,10 @@ async def push_workflow( workflow_name: The workflow name. workflow: The workflow object. """ + workflow_dag = compile_from_yaml_workflow(request.workflow_code) + workflow_name = workflow_dag.name return await push_workflow_impl( - authenticated_user.user_id, workflow_name, None, request + authenticated_user.user_id, workflow_name, None, workflow_dag, request.activate ) diff --git a/admyral/workflow.py b/admyral/workflow.py deleted file mode 100644 index 2beb3032..00000000 --- a/admyral/workflow.py +++ /dev/null @@ -1,79 +0,0 @@ -from typing import TypeVar, Callable, Any - -from admyral.typings import JsonValue -from admyral.models import WorkflowDAG -from admyral.compiler.workflow_compiler import WorkflowCompiler - - -T = TypeVar("T", bound="Workflow") -F = TypeVar("F", bound=Callable[..., None]) - - -class Webhook: - def __init__(self, **default_args: dict[str, JsonValue]) -> None: - super().__init__() - self.default_args = default_args - - -class Schedule: - def __init__( - self, - cron: str | None = None, - interval_seconds: int | None = None, - interval_minutes: int | None = None, - interval_hours: int | None = None, - interval_days: int | None = None, - **default_args: dict[str, JsonValue], - ) -> None: - self.cron = cron - self.interval_seconds = interval_seconds - self.interval_minutes = interval_minutes - self.interval_hours = interval_hours - self.interval_days = interval_days - self.default_args = default_args - - -class Workflow: - def __init__( - self, - func: F, - *, - triggers: list[Webhook | Schedule] = [], - description: str | None = None, - controls: list[str] | None = None, - ) -> None: - self.func = func - self.description = description - self.triggers = triggers - self.controls = controls - - def compile(self) -> WorkflowDAG: - return WorkflowCompiler().compile(self) - - @property - def name(self) -> str: - return self.func.__name__ - - def __call__(self, payload: dict[str, JsonValue]) -> Any: - """Call the wrapped workflow function.""" - return self.func(payload) - - -# TODO: type overload + correct return type -def workflow( - _func: F = None, - *, - description: str | None = None, - triggers: list[Webhook | Schedule] = [], - controls: list[str] | None = None, -) -> Workflow: - """Decorator to create a workflow.""" - - def inner(func: "F") -> Workflow: - w = Workflow( - func, description=description, triggers=triggers, controls=controls - ) - w.__doc__ = func.__doc__ - return w - - return inner if _func is None else inner(_func) diff --git a/examples/access_review/workflows/github_audit_logs_owner_changes/__init__.py b/examples/access_review/workflows/github_audit_logs_owner_changes/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/access_review/workflows/github_audit_logs_owner_changes.py b/examples/access_review/workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.py similarity index 63% rename from examples/access_review/workflows/github_audit_logs_owner_changes.py rename to examples/access_review/workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.py index 526306f6..bf6ab03b 100644 --- a/examples/access_review/workflows/github_audit_logs_owner_changes.py +++ b/examples/access_review/workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.py @@ -1,13 +1,17 @@ +""" + +admyral action push get_time_range_of_last_full_hour -a workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.py +admyral action push build_info_message_owner_changes -a workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.py + +admyral workflow push workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.yaml + +""" + from typing import Annotated from datetime import datetime, timedelta, UTC -from admyral.workflow import workflow, Schedule from admyral.typings import JsonValue from admyral.action import action, ArgumentMetadata -from admyral.actions import ( - search_github_enterprise_audit_logs, - batched_send_slack_message_to_user_by_email, -) @action( @@ -56,30 +60,3 @@ def build_info_message_owner_changes( ) ) return messages - - -@workflow( - description="Alert on GitHub Orga Owner Changes", - triggers=[Schedule(cron="0 * * * *")], -) -def github_audit_logs_owner_changes(payload: dict[str, JsonValue]): - start_and_end_time = get_time_range_of_last_full_hour() - - logs = search_github_enterprise_audit_logs( - enterprise="admyral", # TODO: set your enterprise slug here - filter="action:org.update_member", - start_time=start_and_end_time[0], - end_time=start_and_end_time[1], - secrets={"GITHUB_ENTERPRISE_SECRET": "github_enterprise_secret"}, - ) - - if logs: - messages = build_info_message_owner_changes( - logs=logs, - email="daniel@admyral.dev", # TODO: set your Slack email here - ) - - batched_send_slack_message_to_user_by_email( - messages=messages, - secrets={"SLACK_SECRET": "slack_secret"}, - ) diff --git a/examples/access_review/workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.yaml b/examples/access_review/workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.yaml new file mode 100644 index 00000000..f8c2c1bd --- /dev/null +++ b/examples/access_review/workflows/github_audit_logs_owner_changes/github_audit_logs_owner_changes.yaml @@ -0,0 +1,72 @@ +controls: null +dag: + batched_send_slack_message_to_user_by_email: + args: + messages: "{{ messages }}" + children: [] + id: batched_send_slack_message_to_user_by_email + result_name: null + secrets_mapping: + SLACK_SECRET: slack_secret + type: batched_send_slack_message_to_user_by_email + build_info_message_owner_changes: + args: + email: daniel@admyral.dev + logs: "{{ logs }}" + children: + - batched_send_slack_message_to_user_by_email + id: build_info_message_owner_changes + result_name: messages + secrets_mapping: {} + type: build_info_message_owner_changes + get_time_range_of_last_full_hour: + args: {} + children: + - search_github_enterprise_audit_logs + id: get_time_range_of_last_full_hour + result_name: start_and_end_time + secrets_mapping: {} + type: get_time_range_of_last_full_hour + if: + condition: + type: constant + value: "{{ logs }}" + condition_str: logs + false_children: [] + id: if + true_children: + - build_info_message_owner_changes + type: if_condition + search_github_enterprise_audit_logs: + args: + end_time: "{{ start_and_end_time[1] }}" + enterprise: admyral + filter: action:org.update_member + start_time: "{{ start_and_end_time[0] }}" + children: + - if + id: search_github_enterprise_audit_logs + result_name: logs + secrets_mapping: + GITHUB_ENTERPRISE_SECRET: github_enterprise_secret + type: search_github_enterprise_audit_logs + start: + args: {} + children: + - get_time_range_of_last_full_hour + id: start + result_name: payload + secrets_mapping: {} + type: start +description: Alert on GitHub Orga Owner Changes +name: github_audit_logs_owner_changes +start: + triggers: + - cron: 0 * * * * + default_args: [] + interval_days: null + interval_hours: null + interval_minutes: null + interval_seconds: null + type: schedule +version: "1" diff --git a/examples/access_review/workflows/google_docs_policy_monitoring/google_docs_policy_revision_monitoring.yaml b/examples/access_review/workflows/google_docs_policy_monitoring/google_docs_policy_revision_monitoring.yaml new file mode 100644 index 00000000..cac6af74 --- /dev/null +++ b/examples/access_review/workflows/google_docs_policy_monitoring/google_docs_policy_revision_monitoring.yaml @@ -0,0 +1,67 @@ +controls: null +dag: + get_time_interval_of_last_n_days: + args: + n_days: 1 + children: + - list_google_docs_revisions + id: get_time_interval_of_last_n_days + result_name: start_and_end_yesterday + secrets_mapping: {} + type: get_time_interval_of_last_n_days + if: + condition: + type: constant + value: "{{ revisions }}" + condition_str: revisions + false_children: [] + id: if + true_children: + - send_list_elements_to_workflow + type: if_condition + list_google_docs_revisions: + args: + end_time: "{{ start_and_end_yesterday[1] }}" + file_id: "{{ payload['file_id'] }}" + start_time: "{{ start_and_end_yesterday[0] }}" + children: + - if + id: list_google_docs_revisions + result_name: revisions + secrets_mapping: + GOOGLE_DRIVE_SECRET: google_drive_secret + type: list_google_docs_revisions + send_list_elements_to_workflow: + args: + elements: "{{ revisions }}" + shared_data: + file_id: "{{ payload['file_id'] }}" + latest_version: "{{ revisions[-1]['content'] }}" + workflow_name: google_docs_policy_revision_monitoring_body + children: [] + id: send_list_elements_to_workflow + result_name: null + secrets_mapping: {} + type: send_list_elements_to_workflow + start: + args: {} + children: + - get_time_interval_of_last_n_days + id: start + result_name: payload + secrets_mapping: {} + type: start +description: Monitor Google Docs Policy Revision +name: google_docs_policy_revision_monitoring +start: + triggers: + - cron: null + default_args: + - name: file_id + value: 1ozuJuLT2MOOlJX_DjQ8dAtbI7Xm03JTdp2QbaFU5caY + interval_days: 1 + interval_hours: null + interval_minutes: null + interval_seconds: null + type: schedule +version: "1" diff --git a/examples/access_review/workflows/google_docs_policy_monitoring/google_docs_policy_revision_monitoring_body.yaml b/examples/access_review/workflows/google_docs_policy_monitoring/google_docs_policy_revision_monitoring_body.yaml new file mode 100644 index 00000000..ea7a5bda --- /dev/null +++ b/examples/access_review/workflows/google_docs_policy_monitoring/google_docs_policy_revision_monitoring_body.yaml @@ -0,0 +1,173 @@ +controls: null +dag: + if: + condition: + lhs: + type: constant + value: "{{ is_material_change }}" + op: IN + rhs: + type: constant + value: + - "yes" + - "Yes" + type: binary + condition_str: is_material_change in ['yes', 'Yes'] + false_children: [] + id: if + true_children: + - split_text + - split_text_1 + type: if_condition + if_1: + condition: + lhs: + type: constant + value: "{{ is_modified_date_in_revision_history }}" + op: IN + rhs: + type: constant + value: + - "No" + - "no" + type: binary + condition_str: is_modified_date_in_revision_history in ['No', 'no'] + false_children: [] + id: if_1 + true_children: + - send_slack_message_to_user_by_email + type: if_condition + openai_chat_completion: + args: + model: gpt-4o + prompt: + "Carefully review the following diff of a Google Doc defining a company + policy: + + + + + {{ payload['element']['diff'] }} + + + + + Does the diff contain any material change? A material change of the policy + is, for example, if sections were added, removed, or the content was modified. + Some examples of changes which are not material changes are fixing typos, + changing the order of sections, uppercasing or lowercasing, changing a date, + or reformulating sentences while the meaning remains the same. Answer with + 'yes' if it is a material change and answer with 'no' otherwise. You must + only answer with 'yes' or 'no' and nothing else." + stop_tokens: + - " + + " + children: + - if + id: openai_chat_completion + result_name: is_material_change + secrets_mapping: + OPENAI_SECRET: openai_secret + type: openai_chat_completion + openai_chat_completion_1: + args: + model: gpt-4o + prompt: + "Carefully review the following revision history to check whether it + contains the date {{ modified_date }}. Be aware that the date could also be + in another format, e.g., 2024-02-01 could exist as 02/01/2024 or Feb 1, 2024! + Answer with 'yes' if the revision history contains the date and answer with + 'no' otherwise. You must only answer with 'yes' or 'no' and nothing + else. Here is the revision history which you should check for the date {{ + modified_date }}: + + + + + {{ revision_history }} + + + + " + stop_tokens: + - " + + " + children: + - if_1 + id: openai_chat_completion_1 + result_name: is_modified_date_in_revision_history + secrets_mapping: + OPENAI_SECRET: openai_secret + type: openai_chat_completion + send_slack_message_to_user_by_email: + args: + email: daniel@admyral.ai + text: + "Potential material change without revision history update detected in + Google Docs revision: + + Link: https://docs.google.com/document/d/{{ payload['shared']['file_id'] + }}/edit + + User: {{ payload['element']['lastModifyingUser'] }} + + Modification Time: {{ payload['element']['modifiedTime'] }}" + children: [] + id: send_slack_message_to_user_by_email + result_name: null + secrets_mapping: + SLACK_SECRET: slack_secret + type: send_slack_message_to_user_by_email + split_text: + args: + pattern: Revision history + text: "{{ payload['shared']['latest_version'] }}" + children: + - transform + id: split_text + result_name: splitted_text + secrets_mapping: {} + type: split_text + split_text_1: + args: + pattern: T + text: "{{ payload['element']['modifiedTime'] }}" + children: + - transform_1 + id: split_text_1 + result_name: modified_date + secrets_mapping: {} + type: split_text + start: + args: {} + children: + - openai_chat_completion + id: start + result_name: payload + secrets_mapping: {} + type: start + transform: + args: + value: "{{ splitted_text[-1] }}" + children: + - openai_chat_completion_1 + id: transform + result_name: revision_history + secrets_mapping: {} + type: transform + transform_1: + args: + value: "{{ modified_date[0] }}" + children: + - openai_chat_completion_1 + id: transform_1 + result_name: modified_date + secrets_mapping: {} + type: transform +description: Handle the single revisions for workflow "Review Google Docs Policy Revision" +name: google_docs_policy_revision_monitoring_body +start: + triggers: [] +version: "1" diff --git a/examples/access_review/workflows/google_docs_policy_revision_monitoring.py b/examples/access_review/workflows/google_docs_policy_revision_monitoring.py deleted file mode 100644 index d0184015..00000000 --- a/examples/access_review/workflows/google_docs_policy_revision_monitoring.py +++ /dev/null @@ -1,86 +0,0 @@ -from admyral.workflow import workflow, Schedule -from admyral.typings import JsonValue -from admyral.actions import ( - list_google_docs_revisions, - send_slack_message_to_user_by_email, - openai_chat_completion, - send_list_elements_to_workflow, - split_text, - transform, - get_time_interval_of_last_n_days, -) - - -@workflow( - description="Monitor Google Docs Policy Revision", - triggers=[ - Schedule( - interval_days=1, - file_id="1ozuJuLT2MOOlJX_DjQ8dAtbI7Xm03JTdp2QbaFU5caY", # TODO: place your Google Docs file ID here - ) - ], -) -def google_docs_policy_revision_monitoring(payload: dict[str, JsonValue]): - start_and_end_yesterday = get_time_interval_of_last_n_days(n_days=1) - - revisions = list_google_docs_revisions( - file_id=payload["file_id"], - start_time=start_and_end_yesterday[0], - end_time=start_and_end_yesterday[1], - secrets={"GOOGLE_DRIVE_SECRET": "google_drive_secret"}, - ) - - if revisions: - send_list_elements_to_workflow( - workflow_name="google_docs_policy_revision_monitoring_body", - elements=revisions, - shared_data={ - "file_id": payload["file_id"], - "latest_version": revisions[-1]["content"], - }, - ) - - -@workflow( - description='Handle the single revisions for workflow "Review Google Docs Policy Revision"', -) -def google_docs_policy_revision_monitoring_body(payload: dict[str, JsonValue]): - is_material_change = openai_chat_completion( - model="gpt-4o", - prompt=f"Carefully review the following diff of a Google Doc defining a company policy:\n\n\n{payload["element"]["diff"]}\n\n\n" - "Does the diff contain any material change? A material change of the policy is, for example, if sections were added, removed, or the content was modified. Some examples of " - "changes which are not material changes are fixing typos, changing the order of sections, uppercasing or lowercasing, changing a date, or reformulating sentences while the " - "meaning remains the same. Answer with 'yes' if it is a material change and answer with 'no' otherwise. You must only answer with 'yes' or 'no' and nothing else.", - stop_tokens=["\n"], - secrets={"OPENAI_SECRET": "openai_secret"}, - ) - - if is_material_change in ["yes", "Yes"]: - # Note: we use the latest version of the document to check for the revision history - splitted_text = split_text( - text=payload["shared"]["latest_version"], pattern="Revision history" - ) - revision_history = transform(value=splitted_text[-1]) - - modified_date = split_text(text=payload["element"]["modifiedTime"], pattern="T") - modified_date = transform(value=modified_date[0]) - - is_modified_date_in_revision_history = openai_chat_completion( - model="gpt-4o", - prompt=f"Carefully review the following revision history to check whether it contains the date {modified_date}. Be aware that the date could also be in another format, e.g., " - f"2024-02-01 could exist as 02/01/2024 or Feb 1, 2024! Answer with 'yes' if the revision history contains the date and answer with 'no' otherwise. You must only answer with " - f"'yes' or 'no' and nothing else. Here is the revision history which you should check for the date {modified_date}:\n\n" - f"\n{revision_history}\n\n", - stop_tokens=["\n"], - secrets={"OPENAI_SECRET": "openai_secret"}, - ) - - if is_modified_date_in_revision_history in ["No", "no"]: - send_slack_message_to_user_by_email( - email="daniel@admyral.ai", - text=f"Potential material change without revision history update detected in Google Docs revision:\n" - f"Link: https://docs.google.com/document/d/{payload["shared"]["file_id"]}/edit\n" - f"User: {payload["element"]["lastModifyingUser"]}\n" - f"Modification Time: {payload["element"]["modifiedTime"]}", - secrets={"SLACK_SECRET": "slack_secret"}, - ) diff --git a/examples/access_review/workflows/google_drive_public_link_sharing_docs.py b/examples/access_review/workflows/google_drive_public_link_sharing/google_drive_public_link_sharing_docs.py similarity index 62% rename from examples/access_review/workflows/google_drive_public_link_sharing_docs.py rename to examples/access_review/workflows/google_drive_public_link_sharing/google_drive_public_link_sharing_docs.py index 11f6f3b0..b187593d 100644 --- a/examples/access_review/workflows/google_drive_public_link_sharing_docs.py +++ b/examples/access_review/workflows/google_drive_public_link_sharing/google_drive_public_link_sharing_docs.py @@ -1,7 +1,7 @@ """ -admyral action push transform_google_drive_public_link_sharing_docs -a workflows/google_drive_public_link_sharing_docs.py -admyral workflow push google_drive_public_link_sharing_docs -f workflows/google_drive_public_link_sharing_docs.py --activate +admyral action push transform_google_drive_public_link_sharing_docs -a workflows/google_drive_public_link_sharing/google_drive_public_link_sharing_docs.py +admyral workflow push workflows/google_drive_public_link_sharing/google_drive_public_link_sharing_docs.yaml --activate """ @@ -9,13 +9,7 @@ from collections import defaultdict from admyral.action import action, ArgumentMetadata -from admyral.workflow import workflow from admyral.typings import JsonValue -from admyral.actions import ( - list_google_drive_files_with_link_sharing_enabled, - batched_send_slack_message_to_user_by_email, - send_slack_message_to_user_by_email, -) @action( @@ -94,35 +88,3 @@ def transform_google_drive_public_link_sharing_docs( ) return slack_messages - - -@workflow( - description="Ask users whether the files they own in Google Drive with public link sharing enabled should be really public.", -) -def google_drive_public_link_sharing_docs(payload: dict[str, JsonValue]): - public_files = list_google_drive_files_with_link_sharing_enabled( - customer_id="d43sg123m", - admin_email="daniel@admyral.ai", - secrets={"GOOGLE_DRIVE_SECRET": "google_drive_secret"}, - ) - - # group by user and also group by no user and transform the files - public_files_slack_messages = transform_google_drive_public_link_sharing_docs( - public_files=public_files, - user_message="Please review the following public files in Google Drive. Are you sure they should be public?", - organization_domains=["@admyral.ai"], # TODO: update - ) - - # send slack message to each owner - batched_send_slack_message_to_user_by_email( - messages=public_files_slack_messages["owner"], - secrets={"SLACK_SECRET": "slack_secret"}, - ) - - # send slack message to compliance for all the files - # which do not have an owner - send_slack_message_to_user_by_email( - email="daniel@admyral.ai", # TODO: update - text=public_files_slack_messages["no_owner"], - secrets={"SLACK_SECRET": "slack_secret"}, - ) diff --git a/examples/access_review/workflows/google_drive_public_link_sharing/google_drive_public_link_sharing_docs.yaml b/examples/access_review/workflows/google_drive_public_link_sharing/google_drive_public_link_sharing_docs.yaml new file mode 100644 index 00000000..32a655eb --- /dev/null +++ b/examples/access_review/workflows/google_drive_public_link_sharing/google_drive_public_link_sharing_docs.yaml @@ -0,0 +1,60 @@ +controls: null +dag: + batched_send_slack_message_to_user_by_email: + args: + messages: '{{ public_files_slack_messages[''owner''] }}' + children: [] + id: batched_send_slack_message_to_user_by_email + result_name: null + secrets_mapping: + SLACK_SECRET: slack_secret + type: batched_send_slack_message_to_user_by_email + list_google_drive_files_with_link_sharing_enabled: + args: + admin_email: daniel@admyral.ai + customer_id: d43sg123m + children: + - transform_google_drive_public_link_sharing_docs + id: list_google_drive_files_with_link_sharing_enabled + result_name: public_files + secrets_mapping: + GOOGLE_DRIVE_SECRET: google_drive_secret + type: list_google_drive_files_with_link_sharing_enabled + send_slack_message_to_user_by_email: + args: + email: daniel@admyral.ai + text: '{{ public_files_slack_messages[''no_owner''] }}' + children: [] + id: send_slack_message_to_user_by_email + result_name: null + secrets_mapping: + SLACK_SECRET: slack_secret + type: send_slack_message_to_user_by_email + start: + args: {} + children: + - list_google_drive_files_with_link_sharing_enabled + id: start + result_name: payload + secrets_mapping: {} + type: start + transform_google_drive_public_link_sharing_docs: + args: + organization_domains: + - '@admyral.ai' + public_files: '{{ public_files }}' + user_message: Please review the following public files in Google Drive. Are + you sure they should be public? + children: + - batched_send_slack_message_to_user_by_email + - send_slack_message_to_user_by_email + id: transform_google_drive_public_link_sharing_docs + result_name: public_files_slack_messages + secrets_mapping: {} + type: transform_google_drive_public_link_sharing_docs +description: Ask users whether the files they own in Google Drive with public link + sharing enabled should be really public. +name: google_drive_public_link_sharing_docs +start: + triggers: [] +version: '1' diff --git a/examples/access_review/workflows/kandji_alert_for_unencrypted_devices.yaml b/examples/access_review/workflows/kandji_alert_for_unencrypted_devices.yaml new file mode 100644 index 00000000..d8fba6d9 --- /dev/null +++ b/examples/access_review/workflows/kandji_alert_for_unencrypted_devices.yaml @@ -0,0 +1,79 @@ +controls: null +dag: + format_json_to_list_view_string: + args: + json_value: "{{ selected_fields }}" + children: + - send_slack_message + id: format_json_to_list_view_string + result_name: formatted_string + secrets_mapping: {} + type: format_json_to_list_view_string + if: + condition: + type: constant + value: "{{ unencrypted_devices }}" + condition_str: unencrypted_devices + false_children: [] + id: if + true_children: + - select_fields_from_objects_in_list + type: if_condition + list_kandji_devices: + args: + blueprints: + - Default Blueprint + filevault_enabled: false + last_checkin_within_days: 90 + platform: Mac + children: + - if + id: list_kandji_devices + result_name: unencrypted_devices + secrets_mapping: + KANDJI_SECRET: kandji_secret + type: list_kandji_devices + select_fields_from_objects_in_list: + args: + fields: + - device_name + - device_id + input_list: "{{ unencrypted_devices }}" + children: + - format_json_to_list_view_string + id: select_fields_from_objects_in_list + result_name: selected_fields + secrets_mapping: {} + type: select_fields_from_objects_in_list + send_slack_message: + args: + channel_id: C06QP0KV1L2 + text: + "\U0001F6A8 Unencrypted devices detected \U0001F6A8\n\n{{ formatted_string\ + \ }}" + children: [] + id: send_slack_message + result_name: null + secrets_mapping: + SLACK_SECRET: slack_secret + type: send_slack_message + start: + args: {} + children: + - list_kandji_devices + id: start + result_name: payload + secrets_mapping: {} + type: start +description: Alert for unencrypted managed devices in Kandji via Slack +name: kandji_alert_for_unencrypted_devices +start: + triggers: + - cron: null + default_args: [] + interval_days: 7 + interval_hours: null + interval_minutes: null + interval_seconds: null + type: schedule +version: "1" diff --git a/examples/access_review/workflows/kandji_device_information.py b/examples/access_review/workflows/kandji_device_information.py deleted file mode 100644 index c57bc885..00000000 --- a/examples/access_review/workflows/kandji_device_information.py +++ /dev/null @@ -1,108 +0,0 @@ -""" - -admyral action push transform_kandji_devices_information -a workflows/kandji_device_information.py -admyral workflow push kandji_device_information -f workflows/kandji_device_information.py --activate - -Required Kandji Permissions: -- Application Firewall -- Desktop & Screensaver -- View Library Item Status -- Device list - -""" - -from typing import Annotated - -from admyral.workflow import workflow, Schedule -from admyral.typings import JsonValue -from admyral.actions import ( - list_kandji_devices, - get_kandji_application_firewall, - get_kandji_desktop_and_screensaver, - join_lists, - format_json_to_list_view_string, - send_slack_message, - select_fields_from_objects_in_list, -) -from admyral.action import action, ArgumentMetadata - - -@action( - display_name="Transform Kandji Devices Information", - display_namespace="Kandji", - description="Transforms Kandji devices information.", -) -def transform_kandji_devices_information( - devices: Annotated[ - list[dict[str, JsonValue]], - ArgumentMetadata( - display_name="Devices", - description="Kandji devices information.", - ), - ], -) -> list[dict[str, JsonValue]]: - formatted = [] - for device in devices: - formatted_device = { - "Device ID": device["device_id"], - "Device Name": device["device_name"], - "Platform": device["platform"], - "OS Version": device["os_version"], - "Application Firewall Activated": device["application_firewall_status"], - "Lock Screensaver Interval": device[ - "desktop_and_screensaver_screensaver_interval" - ], - } - formatted.append(formatted_device) - return formatted - - -@workflow( - description="Alert for unencrypted managed devices in Kandji via Slack", - triggers=[Schedule(interval_days=1)], -) -def kandji_device_information(payload: dict[str, JsonValue]): - # get OS version - devices = list_kandji_devices(secrets={"KANDJI_SECRET": "kandji_secret"}) - devices = select_fields_from_objects_in_list( - input_list=devices, - fields=["device_id", "device_name", "platform", "os_version"], - ) - - device_status_application_firewall = get_kandji_application_firewall( - secrets={"KANDJI_SECRET": "kandji_secret"}, - ) - device_status_application_firewall = select_fields_from_objects_in_list( - input_list=device_status_application_firewall, - fields=["device_id", "status"], - ) - result = join_lists( - list1=devices, - list1_join_key_paths=[["device_id"]], - list2=device_status_application_firewall, - list2_join_key_paths=[["device_id"]], - key_prefix_list2="application_firewall_", - ) - - device_status_desktop_and_screensaver = get_kandji_desktop_and_screensaver( - secrets={"KANDJI_SECRET": "kandji_secret"} - ) - device_status_desktop_and_screensaver = select_fields_from_objects_in_list( - input_list=device_status_desktop_and_screensaver, - fields=["device_id", "screensaver_interval"], - ) - result = join_lists( - list1=result, - list1_join_key_paths=[["device_id"]], - list2=device_status_desktop_and_screensaver, - list2_join_key_paths=[["device_id"]], - key_prefix_list2="desktop_and_screensaver_", - ) - - formatted_devices = transform_kandji_devices_information(devices=result) - message = format_json_to_list_view_string(json_value=formatted_devices) - send_slack_message( - channel_id="C06QP0KV1L2", # TODO: set your channel id here - text=f"Kandji Devices Information:\n{message}", - secrets={"SLACK_SECRET": "slack_secret"}, - ) diff --git a/examples/access_review/workflows/kandji_device_information/kandji_device_information.py b/examples/access_review/workflows/kandji_device_information/kandji_device_information.py new file mode 100644 index 00000000..d8eb7b6a --- /dev/null +++ b/examples/access_review/workflows/kandji_device_information/kandji_device_information.py @@ -0,0 +1,47 @@ +""" + +admyral action push transform_kandji_devices_information -a workflows/kandji_device_information.py +admyral workflow push workflows/kandji_device_information.py --activate + +Required Kandji Permissions: +- Application Firewall +- Desktop & Screensaver +- View Library Item Status +- Device list + +""" + +from typing import Annotated + +from admyral.typings import JsonValue +from admyral.action import action, ArgumentMetadata + + +@action( + display_name="Transform Kandji Devices Information", + display_namespace="Kandji", + description="Transforms Kandji devices information.", +) +def transform_kandji_devices_information( + devices: Annotated[ + list[dict[str, JsonValue]], + ArgumentMetadata( + display_name="Devices", + description="Kandji devices information.", + ), + ], +) -> list[dict[str, JsonValue]]: + formatted = [] + for device in devices: + formatted_device = { + "Device ID": device["device_id"], + "Device Name": device["device_name"], + "Platform": device["platform"], + "OS Version": device["os_version"], + "Application Firewall Activated": device["application_firewall_status"], + "Lock Screensaver Interval": device[ + "desktop_and_screensaver_screensaver_interval" + ], + } + formatted.append(formatted_device) + return formatted diff --git a/examples/access_review/workflows/kandji_device_information/kandji_device_information.yaml b/examples/access_review/workflows/kandji_device_information/kandji_device_information.yaml new file mode 100644 index 00000000..296e5249 --- /dev/null +++ b/examples/access_review/workflows/kandji_device_information/kandji_device_information.yaml @@ -0,0 +1,149 @@ +controls: null +dag: + format_json_to_list_view_string: + args: + json_value: "{{ formatted_devices }}" + children: + - send_slack_message + id: format_json_to_list_view_string + result_name: message + secrets_mapping: {} + type: format_json_to_list_view_string + get_kandji_application_firewall: + args: {} + children: + - select_fields_from_objects_in_list_1 + id: get_kandji_application_firewall + result_name: device_status_application_firewall + secrets_mapping: + KANDJI_SECRET: kandji_secret + type: get_kandji_application_firewall + get_kandji_desktop_and_screensaver: + args: {} + children: + - select_fields_from_objects_in_list_2 + id: get_kandji_desktop_and_screensaver + result_name: device_status_desktop_and_screensaver + secrets_mapping: + KANDJI_SECRET: kandji_secret + type: get_kandji_desktop_and_screensaver + join_lists: + args: + key_prefix_list2: application_firewall_ + list1: "{{ devices }}" + list1_join_key_paths: + - - device_id + list2: "{{ device_status_application_firewall }}" + list2_join_key_paths: + - - device_id + children: + - join_lists_1 + id: join_lists + result_name: result + secrets_mapping: {} + type: join_lists + join_lists_1: + args: + key_prefix_list2: desktop_and_screensaver_ + list1: "{{ result }}" + list1_join_key_paths: + - - device_id + list2: "{{ device_status_desktop_and_screensaver }}" + list2_join_key_paths: + - - device_id + children: + - transform_kandji_devices_information + id: join_lists_1 + result_name: result + secrets_mapping: {} + type: join_lists + list_kandji_devices: + args: {} + children: + - select_fields_from_objects_in_list + id: list_kandji_devices + result_name: devices + secrets_mapping: + KANDJI_SECRET: kandji_secret + type: list_kandji_devices + select_fields_from_objects_in_list: + args: + fields: + - device_id + - device_name + - platform + - os_version + input_list: "{{ devices }}" + children: + - join_lists + id: select_fields_from_objects_in_list + result_name: devices + secrets_mapping: {} + type: select_fields_from_objects_in_list + select_fields_from_objects_in_list_1: + args: + fields: + - device_id + - status + input_list: "{{ device_status_application_firewall }}" + children: + - join_lists + id: select_fields_from_objects_in_list_1 + result_name: device_status_application_firewall + secrets_mapping: {} + type: select_fields_from_objects_in_list + select_fields_from_objects_in_list_2: + args: + fields: + - device_id + - screensaver_interval + input_list: "{{ device_status_desktop_and_screensaver }}" + children: + - join_lists_1 + id: select_fields_from_objects_in_list_2 + result_name: device_status_desktop_and_screensaver + secrets_mapping: {} + type: select_fields_from_objects_in_list + send_slack_message: + args: + channel_id: C06QP0KV1L2 + text: "Kandji Devices Information: + + {{ message }}" + children: [] + id: send_slack_message + result_name: null + secrets_mapping: + SLACK_SECRET: slack_secret + type: send_slack_message + start: + args: {} + children: + - list_kandji_devices + - get_kandji_application_firewall + - get_kandji_desktop_and_screensaver + id: start + result_name: payload + secrets_mapping: {} + type: start + transform_kandji_devices_information: + args: + devices: "{{ result }}" + children: + - format_json_to_list_view_string + id: transform_kandji_devices_information + result_name: formatted_devices + secrets_mapping: {} + type: transform_kandji_devices_information +description: Alert for unencrypted managed devices in Kandji via Slack +name: kandji_device_information +start: + triggers: + - cron: null + default_args: [] + interval_days: 1 + interval_hours: null + interval_minutes: null + interval_seconds: null + type: schedule +version: "1" diff --git a/examples/access_review/workflows/kandji_unencrypted_devices.py b/examples/access_review/workflows/kandji_unencrypted_devices.py deleted file mode 100644 index 23240a4b..00000000 --- a/examples/access_review/workflows/kandji_unencrypted_devices.py +++ /dev/null @@ -1,47 +0,0 @@ -""" - -admyral workflow push kandji_alert_for_unencrypted_devices -f workflows/kandji_unencrypted_devices.py --activate - -""" - -from admyral.workflow import workflow, Schedule -from admyral.typings import JsonValue - -from admyral.actions import ( - list_kandji_devices, - format_json_to_list_view_string, - send_slack_message, - select_fields_from_objects_in_list, -) - - -@workflow( - description="Alert for unencrypted managed devices in Kandji via Slack", - triggers=[Schedule(interval_days=7)], -) -def kandji_alert_for_unencrypted_devices(payload: dict[str, JsonValue]): - unencrypted_devices = list_kandji_devices( - last_checkin_within_days=90, - blueprints=[ - "Default Blueprint" - ], # TODO: set your blueprints here if you want to filter by blueprints - platform="Mac", - filevault_enabled=False, - secrets={"KANDJI_SECRET": "kandji_secret"}, - ) - - if unencrypted_devices: - selected_fields = select_fields_from_objects_in_list( - input_list=unencrypted_devices, - fields=["device_name", "device_id"], - ) - - formatted_string = format_json_to_list_view_string( - json_value=selected_fields, - ) - - send_slack_message( - channel_id="C06QP0KV1L2", # TODO: set your channel id here - text=f"🚨 Unencrypted devices detected 🚨\n\n{formatted_string}", - secrets={"SLACK_SECRET": "slack_secret"}, - ) diff --git a/examples/access_review/workflows/list_okta_admins.yaml b/examples/access_review/workflows/list_okta_admins.yaml new file mode 100644 index 00000000..3921e6da --- /dev/null +++ b/examples/access_review/workflows/list_okta_admins.yaml @@ -0,0 +1,34 @@ +controls: null +dag: + okta_get_all_user_types: + args: {} + children: + - okta_search_users + id: okta_get_all_user_types + result_name: user_types + secrets_mapping: + OKTA_SECRET: okta_secret + type: okta_get_all_user_types + okta_search_users: + args: + search: type.id eq "{{ user_types[0]['id'] }}" + children: [] + id: okta_search_users + result_name: null + secrets_mapping: + OKTA_SECRET: okta_secret + type: okta_search_users + start: + args: {} + children: + - okta_get_all_user_types + id: start + result_name: payload + secrets_mapping: {} + type: start +description: Retrieves all user types from Okta and lists the corresponding admin + users. +name: list_okta_admins +start: + triggers: [] +version: '1' diff --git a/examples/access_review/workflows/monitor_and_follow_up_merged_github_prs_without_approval.py b/examples/access_review/workflows/monitor_and_follow_up_merged_github_prs_without_approval.py deleted file mode 100644 index 5dabfd6a..00000000 --- a/examples/access_review/workflows/monitor_and_follow_up_merged_github_prs_without_approval.py +++ /dev/null @@ -1,393 +0,0 @@ -""" - -How to push this workflow to Admyral: - -1. Replace the placeholders marked with TODO in the workflow code below. - -2. Save the changes. - -3. Use the following CLI commands for pushing the workflow (WARNING: Make sure that the file paths are correct): - -admyral action push count_number_of_changes_in_git_diff -a workflows/monitor_and_follow_up_merged_github_prs_without_approval.py -admyral action push contains_pull_request_approval_as_comment_after_merge -a workflows/monitor_and_follow_up_merged_github_prs_without_approval.py -admyral action push clean_git_diff -a workflows/monitor_and_follow_up_merged_github_prs_without_approval.py -admyral workflow push monitor_merged_github_prs_without_approval -f workflows/monitor_and_follow_up_merged_github_prs_without_approval.py --activate -admyral workflow push handle_merged_github_pr_without_approval -f workflows/monitor_and_follow_up_merged_github_prs_without_approval.py --activate - -4. Connect to the following tools (see docs.admyral.dev for more information): - - - Slack - - GitHub - - Jira - - OpenAI - -""" - -from typing import Annotated -from dateutil import parser - -from admyral.workflow import workflow, Schedule -from admyral.typings import JsonValue -from admyral.action import action, ArgumentMetadata -from admyral.actions import ( - list_github_merged_pull_requests_without_approval, - get_time_interval_of_last_n_days, - send_slack_message, - send_list_elements_to_workflow, - compare_two_github_commits, - transform, - send_slack_message_to_user_by_email, - wait, - create_jira_issue, - openai_chat_completion, - list_github_issue_comments, -) - - -@action( - display_name="Count Number of Changes in Git Diff", - display_namespace="GitHub", - description="Count the number of changes in a git diff", -) -def count_number_of_changes_in_git_diff( - git_diff: Annotated[ - dict[str, JsonValue], - ArgumentMetadata( - display_name="Git Diff", - description="The git diff to be checked", - ), - ], -) -> int: - return sum(map(lambda file: file["changes"], git_diff["files"])) - - -@action( - display_name="Contains Pull Request Approval as Comment After Merge?", - display_namespace="GitHub", - description='Check whether there is a comment "approved" in the review history after the merge.', - requirements=["dateutil"], -) -def contains_pull_request_approval_as_comment_after_merge( - comments: Annotated[ - list[dict[str, JsonValue]], - ArgumentMetadata( - display_name="Comments", - description="The comments of the pull request.", - ), - ], - merged_at: Annotated[ - str, - ArgumentMetadata( - display_name="Merged At", - description="The timestamp when the pull request was merged.", - ), - ], - approval_keywords: Annotated[ - str | list[str], - ArgumentMetadata( - display_name="Approval Keywords", - description="The keywords to check for approval.", - ), - ] = ["approved"], -) -> bool: - merged_at = parser.parse(merged_at) - allowed_approval_keywords = set( - approval_keywords - if isinstance(approval_keywords, list) - else [approval_keywords] - ) - return any( - map( - lambda review: review.get("body", "") in allowed_approval_keywords - and parser.parse(review.get("created_at")) > merged_at, - comments, - ) - ) - - -@action( - display_name="Clean Git Diff", - display_namespace="GitHub", - description="Clean the git diff to remove unnecessary information.", -) -def clean_git_diff( - git_diff: Annotated[ - str, - ArgumentMetadata( - display_name="Git Diff", - description="The git diff to be cleaned", - ), - ], -) -> dict[str, JsonValue]: - return "\n".join( - filter( - lambda line: line.startswith("+") or line.startswith("-"), - git_diff.split("\n"), - ) - ) - - -@workflow( - description="Monitor Merged GitHub PRs Without Approval", - triggers=[Schedule(interval_days=1)], -) -def monitor_merged_github_prs_without_approval(payload: dict[str, JsonValue]): - last_day_time_interval = get_time_interval_of_last_n_days(n_days=1) - - unreviewed_prs = list_github_merged_pull_requests_without_approval( - repo_owner="Admyral-Security", # TODO: set your repo owner here - repo_name="Admyral_Github_Integration_Test", # TODO: set your repo name here - start_time=last_day_time_interval[0], - end_time=last_day_time_interval[1], - secrets={"GITHUB_SECRET": "github_secret"}, - ) - - send_list_elements_to_workflow( - workflow_name="handle_merged_github_pr_without_approval", - elements=unreviewed_prs, - shared_data={ - "repo_owner": "Admyral-Security", # TODO: set your repo owner here - "repo_name": "Admyral_Github_Integration_Test", # TODO: set your repo name here - }, - ) - - -@workflow( - description="Handle Merged GitHub PRs Without Approval", -) -def handle_merged_github_pr_without_approval(payload: dict[str, JsonValue]): - commit_diff_info = compare_two_github_commits( - repo_owner=payload["shared"]["repo_owner"], - repo_name=payload["shared"]["repo_name"], - base=payload["element"]["last_approved_commit_id"], - head=payload["element"]["last_commit_id"], - diff_type="json", - secrets={"GITHUB_SECRET": "github_secret"}, - ) - - line_changes_count = count_number_of_changes_in_git_diff(git_diff=commit_diff_info) - - if line_changes_count < 50: - # Perform classification of the git diff changes - commit_diff = compare_two_github_commits( - repo_owner=payload["shared"]["repo_owner"], - repo_name=payload["shared"]["repo_name"], - base=payload["element"]["last_approved_commit_id"], - head=payload["element"]["last_commit_id"], - diff_type="diff", - secrets={"GITHUB_SECRET": "github_secret"}, - ) - - commit_diff_cleaned = clean_git_diff(git_diff=commit_diff) - - git_diff_summary = openai_chat_completion( - model="gpt-4o", - prompt=f"You are an expert level software engineer and should summarize git diffs." - "For the git differences, it's important to check for code additions (+) or code deletions (-). Lines which are not prefixed with + or - did not change. Please\n" - "pay close attention to the +/- and only consider true code changes in your summary! Here are some examples:\n" - "\n" - "Example 1: \n" - "\n" - "diff --git a/src/union_find.rs b/src/union_find.rs\n" - "index ee49a24..8e3eb35 100644 \n" - "--- a/src/union_find.rs \n" - "+++ b/src/union_find.rs \n" - "@@ -19,16 +19,6 @@ pub struct UnionFind {{ \n" - "}} \n" - "\n" - "impl UnionFind {{ \n" - "- /// Creates an empty Union-Find structure with a specified capacity. \n" - "- pub fn with_capacity(capacity: usize) -> Self {{ \n" - "- Self {{ \n" - "- parent_links: Vec::with_capacity(capacity), \n" - "- sizes: Vec::with_capacity(capacity), \n" - "- payloads: HashMap::with_capacity(capacity), \n" - "- count: 0, \n" - "- }} \n" - "- }} \n" - "- \n" - " /// Inserts a new item (disjoint set) into the data structure. \n" - " pub fn insert(&mut self, item: T) {{ \n" - " let key = self.payloads.len(); \n" - "\n" - "\n" - "\n" - "This diff removes the with_capacity method from the UnionFind struct implementation in the file union_find.rs. The method was used to create an empty Union-Find structure with a \n" - "specified initial capacity. Its removal simplifies the API by eliminating this initialization option. \n" - "\n" - "\n" - "Example 2: \n" - "\n" - "diff --git a/web/src/components/workflow-editor/edit-panel/action-edit-panel.tsx b/web/src/components/workflow-editor/edit-panel/action-edit-panel.tsx \n" - "index ea112c9..e457dd0 100644 \n" - "--- a/web/src/components/workflow-editor/edit-panel/action-edit-panel.tsx \n" - "+++ b/web/src/components/workflow-editor/edit-panel/action-edit-panel.tsx \n" - "@@ -167,18 +167,20 @@ export default function ActionEditPanel() {{ \n" - ' \n' - " Type: {{argument.argType}} \n" - " \n" - "-