Skip to content

Commit

Permalink
[4.3] PISTON-397: handle agent channels that are destroyed quickly (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfinke authored and jamesaimonetti committed Jun 2, 2020
1 parent bf8ea2d commit d4afa8f
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 35 deletions.
3 changes: 3 additions & 0 deletions applications/acdc/src/acdc.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
-define(NEW_CHANNEL_FROM(CallId), {'call_from', CallId}).
-define(NEW_CHANNEL_TO(CallId, MemberCallId), {'call_to', CallId, MemberCallId}).

-define(DESTROYED_CHANNEL_REG(AcctId, User), {'p', 'l', {'destroyed_channel', AcctId, User}}).
-define(DESTROYED_CHANNEL(CallId, HangupCause), {'call_down', CallId, HangupCause}).

-type abandon_reason() :: ?ABANDON_TIMEOUT | ?ABANDON_EXIT |
?ABANDON_HANGUP.

Expand Down
50 changes: 26 additions & 24 deletions applications/acdc/src/acdc_agent_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ call_event(ServerRef, <<"call_event">>, <<"CHANNEL_UNBRIDGE">>, JObj) ->
call_event(ServerRef, <<"call_event">>, <<"usurp_control">>, JObj) ->
gen_statem:cast(ServerRef, {'usurp_control', call_id(JObj)});
call_event(ServerRef, <<"call_event">>, <<"CHANNEL_DESTROY">>, JObj) ->
ServerRef ! {'channel_hungup', call_id(JObj), hangup_cause(JObj)};
ServerRef ! ?DESTROYED_CHANNEL(call_id(JObj), acdc_util:hangup_cause(JObj));
call_event(ServerRef, <<"call_event">>, <<"CHANNEL_DISCONNECTED">>, JObj) ->
ServerRef ! ?DESTROYED_CHANNEL(call_id(JObj), <<"MEDIA_SERVER_UNREACHABLE">>);
call_event(ServerRef, <<"call_event">>, <<"LEG_CREATED">>, JObj) ->
gen_statem:cast(ServerRef, {'leg_created', call_id(JObj)});
call_event(ServerRef, <<"call_event">>, <<"LEG_DESTROYED">>, JObj) ->
Expand Down Expand Up @@ -700,7 +702,7 @@ ready('info', ?NEW_CHANNEL_TO(CallId, 'undefined'), State) ->
{'next_state', 'outbound', start_outbound_call_handling(CallId, State), 'hibernate'};
ready('info', ?NEW_CHANNEL_TO(CallId, MemberCallId), State) ->
cancel_if_failed_originate(CallId, MemberCallId, 'ready', State);
ready('info', {'channel_hungup', CallId, _Cause}, #state{agent_listener=AgentListener
ready('info', ?DESTROYED_CHANNEL(CallId, _Cause), #state{agent_listener=AgentListener
,outbound_call_ids=OutboundCallIds
}=State) ->
case lists:member(CallId, OutboundCallIds) of
Expand Down Expand Up @@ -941,7 +943,7 @@ ringing('info', ?NEW_CHANNEL_TO(CallId, MemberCallId), #state{member_call_id=Mem
{'next_state', 'ringing', State};
ringing('info', ?NEW_CHANNEL_TO(CallId, MemberCallId), State) ->
cancel_if_failed_originate(CallId, MemberCallId, 'ringing', State);
ringing('info', {'channel_hungup', AgentCallId, Cause}, #state{agent_listener=AgentListener
ringing('info', ?DESTROYED_CHANNEL(AgentCallId, Cause), #state{agent_listener=AgentListener
,agent_call_id=AgentCallId
,account_id=AccountId
,agent_id=AgentId
Expand All @@ -965,15 +967,15 @@ ringing('info', {'channel_hungup', AgentCallId, Cause}, #state{agent_listener=Ag
'paused' -> {'next_state', 'paused', State1};
'ready' -> apply_state_updates(State1)
end;
ringing('info', {'channel_hungup', MemberCallId, _Cause}, #state{agent_listener=AgentListener
ringing('info', ?DESTROYED_CHANNEL(MemberCallId, _Cause), #state{agent_listener=AgentListener
,member_call_id=MemberCallId
}=State) ->
lager:debug("caller's channel (~s) has gone down, stop agent's call: ~s", [MemberCallId, _Cause]),
acdc_agent_listener:channel_hungup(AgentListener, MemberCallId),

acdc_agent_listener:presence_update(AgentListener, ?PRESENCE_GREEN),
apply_state_updates(clear_call(State, 'ready'));
ringing('info', {'channel_hungup', CallId, _Cause}, #state{agent_listener=AgentListener
ringing('info', ?DESTROYED_CHANNEL(CallId, _Cause), #state{agent_listener=AgentListener
,outbound_call_ids=OutboundCallIds
}=State) ->
case lists:member(CallId, OutboundCallIds) of
Expand Down Expand Up @@ -1111,12 +1113,12 @@ answered('info', ?NEW_CHANNEL_TO(CallId, 'undefined'), #state{agent_listener=Age
answered('info', ?NEW_CHANNEL_TO(CallId, MemberCallId), #state{member_call_id=MemberCallId}=State) ->
lager:debug("new channel ~s for agent", [CallId]),
{'next_state', 'answered', State};
answered('info', {'channel_hungup', CallId, Cause}, #state{member_call_id=CallId
answered('info', ?DESTROYED_CHANNEL(CallId, Cause), #state{member_call_id=CallId
,outbound_call_ids=[]
}=State) ->
lager:debug("caller's channel hung up: ~s", [Cause]),
{'next_state', 'wrapup', State#state{wrapup_ref=hangup_call(State, 'member')}};
answered('info', {'channel_hungup', CallId, _Cause}, #state{account_id=AccountId
answered('info', ?DESTROYED_CHANNEL(CallId, _Cause), #state{account_id=AccountId
,agent_id=AgentId
,agent_listener=AgentListener
,member_call_id=CallId
Expand All @@ -1129,12 +1131,12 @@ answered('info', {'channel_hungup', CallId, _Cause}, #state{account_id=AccountId
acdc_agent_listener:channel_hungup(AgentListener, CallId),
maybe_notify(Ns, ?NOTIFY_HANGUP, State),
{'next_state', 'outbound', start_outbound_call_handling(OutboundCallId, clear_call(State, 'ready')), 'hibernate'};
answered('info', {'channel_hungup', CallId, Cause}, #state{agent_call_id=CallId
answered('info', ?DESTROYED_CHANNEL(CallId, Cause), #state{agent_call_id=CallId
,outbound_call_ids=[]
}=State) ->
lager:debug("agent's channel has hung up: ~s", [Cause]),
{'next_state', 'wrapup', State#state{wrapup_ref=hangup_call(State, 'agent')}};
answered('info', {'channel_hungup', CallId, _Cause}, #state{account_id=AccountId
answered('info', ?DESTROYED_CHANNEL(CallId, _Cause), #state{account_id=AccountId
,agent_id=AgentId
,agent_listener=AgentListener
,member_call_id=MemberCallId
Expand All @@ -1148,7 +1150,7 @@ answered('info', {'channel_hungup', CallId, _Cause}, #state{account_id=AccountId
acdc_agent_listener:channel_hungup(AgentListener, MemberCallId),
maybe_notify(Ns, ?NOTIFY_HANGUP, State),
{'next_state', 'outbound', start_outbound_call_handling(OutboundCallId, clear_call(State, 'ready')), 'hibernate'};
answered('info', {'channel_hungup', CallId, _Cause}, #state{agent_listener=AgentListener
answered('info', ?DESTROYED_CHANNEL(CallId, _Cause), #state{agent_listener=AgentListener
,outbound_call_ids=OutboundCallIds
}=State) ->
case lists:member(CallId, OutboundCallIds) of
Expand Down Expand Up @@ -1196,8 +1198,6 @@ wrapup('cast', {'sync_req', JObj}, #state{agent_listener=AgentListener
{'next_state', 'wrapup', State};
wrapup('cast', {'sync_resp', _}, State) ->
{'next_state', 'wrapup', State};
wrapup('cast', {'channel_hungup', _, _}, State) ->
{'next_state', 'wrapup', State};
wrapup('cast', {'leg_destroyed', CallId}, #state{agent_listener=AgentListener}=State) ->
lager:debug("leg ~s destroyed", [CallId]),
acdc_agent_listener:channel_hungup(AgentListener, CallId),
Expand Down Expand Up @@ -1353,7 +1353,7 @@ outbound('info', ?NEW_CHANNEL_TO(CallId, 'undefined'), #state{agent_listener=Age
{'next_state', 'outbound', State#state{outbound_call_ids=[CallId | lists:delete(CallId, OutboundCallIds)]}};
outbound('info', ?NEW_CHANNEL_TO(CallId, MemberCallId), State) ->
cancel_if_failed_originate(CallId, MemberCallId, 'outbound', State);
outbound('info', {'channel_hungup', CallId, Cause}, #state{agent_listener=AgentListener
outbound('info', ?DESTROYED_CHANNEL(CallId, Cause), #state{agent_listener=AgentListener
,outbound_call_ids=OutboundCallIds
}=State) ->
acdc_agent_listener:channel_hungup(AgentListener, CallId),
Expand Down Expand Up @@ -1509,7 +1509,7 @@ handle_info(?NEW_CHANNEL_FROM(_CallId), StateName, State) ->
{'next_state', StateName, State};
handle_info(?NEW_CHANNEL_TO(_CallId, _), StateName, State) ->
{'next_state', StateName, State};
handle_info({'channel_hungup', _, _}, StateName, State) ->
handle_info(?DESTROYED_CHANNEL(_, _), StateName, State) ->
{'next_state', StateName, State};
handle_info(_Info, StateName, State) ->
lager:debug("unhandled message in state ~s: ~p", [StateName, _Info]),
Expand Down Expand Up @@ -1596,13 +1596,6 @@ call_id(JObj) ->
CallId -> CallId
end.

-spec hangup_cause(kz_json:object()) -> kz_term:ne_binary().
hangup_cause(JObj) ->
case kz_json:get_value(<<"Hangup-Cause">>, JObj) of
'undefined' -> <<"unknown">>;
Cause -> Cause
end.

%% returns time left in seconds
-spec time_left(reference() | 'false' | kz_term:api_integer()) -> kz_term:api_integer().
time_left(Ref) when is_reference(Ref) ->
Expand Down Expand Up @@ -1769,26 +1762,35 @@ find_endpoint_id(EP, 'undefined') -> kz_json:get_value(<<"Endpoint-ID">>, EP);
find_endpoint_id(_EP, EPId) -> EPId.

-spec monitor_endpoint(kz_json:object(), kz_term:ne_binary(), kz_types:server_ref()) -> any().
monitor_endpoint('undefined', _, _) -> 'ok';
monitor_endpoint(EP, AccountId, AgentListener) ->
Username = find_username(EP),

%% Bind for outbound call requests
acdc_agent_listener:add_endpoint_bindings(AgentListener
,kz_endpoint:get_sip_realm(EP, AccountId)
,find_username(EP)
),

%% Inform us of device changes
catch gproc:reg(?ENDPOINT_UPDATE_REG(AccountId, find_endpoint_id(EP))),
catch gproc:reg(?NEW_CHANNEL_REG(AccountId, find_username(EP))).
catch gproc:reg(?NEW_CHANNEL_REG(AccountId, Username)),
catch gproc:reg(?DESTROYED_CHANNEL_REG(AccountId, Username)).

-spec unmonitor_endpoint(kz_json:object(), kz_term:ne_binary(), kz_types:server_ref()) -> any().
unmonitor_endpoint(EP, AccountId, AgentListener) ->
Username = find_username(EP),

%% Bind for outbound call requests
acdc_agent_listener:remove_endpoint_bindings(AgentListener
,kz_endpoint:get_sip_realm(EP, AccountId)
,find_username(EP)
),

%% Inform us of device changes
catch gproc:unreg(?ENDPOINT_UPDATE_REG(AccountId, kz_doc:id(EP))),
catch gproc:unreg(?NEW_CHANNEL_REG(AccountId, find_username(EP))).
catch gproc:unreg(?ENDPOINT_UPDATE_REG(AccountId, find_endpoint_id(EP))),
catch gproc:unreg(?NEW_CHANNEL_REG(AccountId, Username)),
catch gproc:unreg(?DESTROYED_CHANNEL_REG(AccountId, Username)).

-spec maybe_add_endpoint(kz_term:ne_binary(), kz_json:object(), kz_json:objects(), kz_term:ne_binary(), kz_types:server_ref()) -> any().
maybe_add_endpoint(EPId, EP, EPs, AccountId, AgentListener) ->
Expand Down
40 changes: 32 additions & 8 deletions applications/acdc/src/acdc_agent_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
,handle_sync_resp/2
,handle_call_event/2
,handle_new_channel/2
,handle_destroyed_channel/2
,handle_originate_resp/2
,handle_member_message/2
,handle_agent_message/2
,handle_config_change/2
,handle_presence_probe/2
,handle_destroy/2
]).

-include("acdc.hrl").
Expand Down Expand Up @@ -256,14 +256,44 @@ handle_new_channel_acct(JObj, AccountId) ->

lager:debug("new channel in acct ~s: from ~s to ~s(~s)", [AccountId, FromUser, ToUser, ReqUser]),

case kz_json:get_value(<<"Call-Direction">>, JObj) of
case kz_call_event:call_direction(JObj) of
<<"inbound">> -> gproc:send(?NEW_CHANNEL_REG(AccountId, FromUser), ?NEW_CHANNEL_FROM(CallId));
<<"outbound">> ->
gproc:send(?NEW_CHANNEL_REG(AccountId, ToUser), ?NEW_CHANNEL_TO(CallId, MemberCallId)),
gproc:send(?NEW_CHANNEL_REG(AccountId, ReqUser), ?NEW_CHANNEL_TO(CallId, MemberCallId));
_ -> lager:debug("invalid call direction for call ~s", [CallId])
end.

%%------------------------------------------------------------------------------
%% @doc Send event to agent FSM when channels are destroyed. This occurs in
%% addition to the above handle_call_event/2. Though this is redundant
%% in most cases, it will keep the agent from becoming stuck in the
%% outbound state if a channel is created and destroyed before the
%% acdc_agent_listener gen_listener can bind to it.
%%
%% @end
%%------------------------------------------------------------------------------
-spec handle_destroyed_channel(kz_json:object(), kz_term:api_binary()) -> 'ok'.
handle_destroyed_channel(JObj, AccountId) ->
FromUser = hd(binary:split(kz_json:get_value(<<"From">>, JObj), <<"@">>)),
ToUser = hd(binary:split(kz_json:get_value(<<"To">>, JObj), <<"@">>)),

CallId = kz_json:get_value(<<"Call-ID">>, JObj),
HangupCause = acdc_util:hangup_cause(JObj),

lager:debug("destroyed channel in acct ~s: from ~s to ~s", [AccountId, FromUser, ToUser]),

case kz_call_event:call_direction(JObj) of
<<"inbound">> -> gproc:send(?DESTROYED_CHANNEL_REG(AccountId, FromUser)
,?DESTROYED_CHANNEL(CallId, HangupCause));
<<"outbound">> ->
gproc:send(?DESTROYED_CHANNEL_REG(AccountId, FromUser)
,?DESTROYED_CHANNEL(CallId, HangupCause)),
gproc:send(?DESTROYED_CHANNEL_REG(AccountId, ToUser)
,?DESTROYED_CHANNEL(CallId, HangupCause));
_ -> 'ok'
end.

-spec handle_originate_resp(kz_json:object(), kz_term:proplist()) -> 'ok'.
handle_originate_resp(JObj, Props) ->
case kz_json:get_value(<<"Event-Name">>, JObj) of
Expand Down Expand Up @@ -443,12 +473,6 @@ send_probe(JObj, State) ->
],
kapi_presence:publish_update(PresenceUpdate).

-spec handle_destroy(kz_json:object(), kz_term:proplist()) -> 'ok'.
handle_destroy(JObj, Props) ->
'true' = kapi_call:event_v(JObj),
FSM = props:get_value('fsm_pid', Props),
acdc_agent_fsm:call_event(FSM, <<"call_event">>, <<"CHANNEL_DESTROY">>, JObj).

presence_id(JObj) ->
presence_id(JObj, 'undefined').
presence_id(JObj, AgentId) ->
Expand Down
3 changes: 0 additions & 3 deletions applications/acdc/src/acdc_agent_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@
,{{'acdc_agent_handler', 'handle_agent_message'}
,[{<<"agent">>, <<"*">>}]
}
,{{'acdc_agent_handler', 'handle_destroy'}
,[{<<"channel">>, <<"destroy">>}]
}
,{{'acdc_agent_handler', 'handle_config_change'}
,[{<<"configuration">>, <<"*">>}]
}
Expand Down
3 changes: 3 additions & 0 deletions applications/acdc/src/acdc_agent_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ handle_info(?HOOK_EVT(AccountId, <<"CHANNEL_CREATE">>, JObj), State) ->
lager:debug("channel_create event"),
_ = kz_util:spawn(fun acdc_agent_handler:handle_new_channel/2, [JObj, AccountId]),
{'noreply', State};
handle_info(?HOOK_EVT(AccountId, <<"CHANNEL_DESTROY">>, JObj), State) ->
_ = kz_util:spawn(fun acdc_agent_handler:handle_destroyed_channel/2, [JObj, AccountId]),
{'noreply', State};
handle_info(?HOOK_EVT(_AccountId, _EventName, _JObj), State) ->
lager:debug("ignoring ~s for account ~s on call ~s", [_EventName, _AccountId, kz_json:get_value(<<"Call-ID">>, _JObj)]),
{'noreply', State};
Expand Down
8 changes: 8 additions & 0 deletions applications/acdc/src/acdc_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
,agent_presence_update/2
,presence_update/3, presence_update/4
,send_cdr/2
,hangup_cause/1
]).

-include("acdc.hrl").
Expand Down Expand Up @@ -138,3 +139,10 @@ proc_id(Pid) -> proc_id(Pid, node()).

-spec proc_id(pid(), atom() | kz_term:ne_binary()) -> kz_term:ne_binary().
proc_id(Pid, Node) -> list_to_binary([kz_term:to_binary(Node), "-", pid_to_list(Pid)]).

-spec hangup_cause(kz_json:object()) -> kz_term:ne_binary().
hangup_cause(JObj) ->
case kz_json:get_ne_binary_value(<<"Hangup-Cause">>, JObj) of
'undefined' -> <<"unknown">>;
Cause -> Cause
end.

0 comments on commit d4afa8f

Please sign in to comment.