Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature evensub #58

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
276 changes: 274 additions & 2 deletions app.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,248 @@ function nonce( length ) {
return text;
}

/**
*
* @param { string } password
* @param { string[] } requiredScopes
* @returns { string | null }
*/
async function fetchClientIdIfValidAsync( password, requiredScopes ) {
let err = null;
let validation = await fetch( "https://id.twitch.tv/oauth2/validate", {
headers: {
"Authorization": `OAuth ${password}`
}
}).then( r => r.json() )
.catch( e => (err = e, null));

if ( err || validation === null ) {
console.error( "Error fetching validation: ", err );
return null;
}

if ( !validation.client_id ) {
console.error( "Invalid Password" );
return null;
}

const missingScopes = requiredScopes.filter( scope => !validation.scopes.includes( scope ) );

if ( missingScopes.length ) {
console.error( "Missing required scopes: ", missingScopes.join(", ") );
return null;
}

return validation.client_id;
}

/**
*
* @param { string } channel
* @param { string } clientId
* @param { string } password
* @returns { string | null }
*/
async function fetchChannelIdAsync( channel, clientId, password ) {
let err;
let userInfo = await fetch( "https://api.twitch.tv/helix/users?login=" + channel, {
headers: {
"Client-ID": clientId,
"Authorization": `Bearer ${password}`
}
}).then( r => r.json() )
.catch( e => (err = e, null));

if ( err || userInfo === null ) {
console.error( "Error fetching user info: ", err );
return null;
}

return userInfo.data[ 0 ].id;
}

/**
*
* @param { string } type
* @param { string } version
* @param { string } clientId
* @param { string } password
* @param { string } channelId
* @param { string } sessionId
* @returns { boolean } was the subscription successful
*/
async function subscribeToEventAsync( type, version, clientId, password, channelId, sessionId ) {
let err;
await fetch( "https://api.twitch.tv/helix/eventsub/subscriptions", {
method: "POST",
headers: {
"Client-ID": clientId,
"Authorization": `Bearer ${password}`,
"Content-Type": "application/json"
},
body: JSON.stringify( {
type,
version,
condition: {
broadcaster_user_id: channelId,
},
transport: {
method: "websocket",
session_id: sessionId
}
} )
})
.catch( e => (err = e, console.error( "Error subscribing to event: ", type, e )));


return !err;
}

async function eventSubConnectAsync( channel, password, clientId = null, channelId = null, connectionName = null, sessionId = null, clearObject = null) {
/** @type { [string, string][] } */
const subscribtions = [
[ "channel.channel_points_automatic_reward_redemption.add", "1" ],
[ "channel.channel_points_custom_reward_redemption.add", "1" ],
];

password = password.replace( "oauth:", "" );

if ( !clientId ) {
clientId = await fetchClientIdIfValidAsync( password, [ "channel:read:redemptions", "user:read:email" ] );
if ( clientId === null ) {
return;
}
}

if ( !channelId ) {
channelId = await fetchChannelIdAsync( channel, clientId, password );
if ( channelId === null ) {
return;
}
}

const keepAliveSeconds = 30;
if (!connectionName) {
connectionName = "wss://eventsub.wss.twitch.tv/ws";
if ( keepAliveSeconds !== 30 ) {
connectionName += "?keepalive_timeout_seconds=" + keepAliveSeconds;
}
}

const ws = typeof window !== "undefined"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be good to be able to have this websocket available outside of the function or to return it after the connect completes to be able to call it and disconnect or do other things with it. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we planning on using it outside point rewards?

https://dev.twitch.tv/docs/eventsub/handling-websocket-events/#revocation-message

IMPORTANT By default, you have 10 seconds from the time you receive the Welcome message to subscribe to an event, unless otherwise specified when connecting. If you don’t subscribe within this timeframe, the server closes the connection.

I'm not sure if that applies to first subscription or the entire lifecycle

i think it would be reasonable to return the obscured onDisconnect function

? new WebSocket( connectionName )
: new NodeSocket( connectionName );

/** @type { NodeJS.Timeout } */
let keepAliveTimeout;

// this way if we need to reconnect we can call this function again in returned function
clearObject = clearObject || {};
clearObject.onDisconnect = (reconnect = true) => {
clearTimeout(keepAliveTimeout);
ws.close();
if (reconnect) {
eventSubConnectAsync( channel, password, clientId, channelId, connectionName, sessionId, clearObject );
}
}

ws.onerror = function( error ) {
console.error( error );
clearObject.onDisconnect(false);
}

ws.onopen = function( event ) {
if ( comfyJS && comfyJS.isDebug ) {
console.log( "Connected to EventSub" );
}
}

ws.onmessage = function( event ) {
const message = JSON.parse(event.data);
if( message.type === "PING" ) {
ws.send( JSON.stringify( { type: 'PONG' } ) );
return;
}
switch( message.metadata.message_type ) {
case "session_welcome":
{
sessionId = message.session.id;
// account that the keepalive will happen within last second
keepAliveSeconds = message.session.keepalive_timeout_seconds + 1;
keepAliveTimeout = setTimeout(() => clearObject.onDisconnect(), keepAliveSeconds * 1000);

void Promise.all(
subscribtions.map(( [ type, version ] ) =>
subscribeToEventAsync( type, version, clientId, password, channelId, sessionId )
)
)
.then( r => !r.every(x => x) && clearObject.onDisconnect(false) );
break;
}
case "session_keepalive":
{
clearTimeout(keepAliveTimeout);
keepAliveTimeout = setTimeout(() => clearObject.onDisconnect(), keepAliveSeconds * 1000);
break;
}
case "session_reconnect":
{
connectionName = message.payload.session.reconnect_url;
clearTimeout(keepAliveTimeout);
clearObject.onDisconnect();
break;
}
case "revocation":
{
if (!subscribtions.map( ( [ type, _ ] ) => type).includes(message.payload.type)) {
break;
}
clearObject.onDisconnect(false);
break;
}
case "notification":
{
keepAliveTimeout = setTimeout(() => clearObject.onDisconnect(), keepAliveSeconds * 1000);
clearTimeout(keepAliveTimeout);
keepAliveTimeout = setTimeout(() => onDisconnect(), keepAliveSeconds * 1000);
const reward = message.payload.event.reward;
const rewardObj = {
id: reward.id,
channelId,
title: "title" in reward ? reward.title : null,
prompt: "prompt" in reward ? reward.prompt : null,
cost: reward.cost,
};
const extra = {
channelId: reward.broadcaster_user_id,
reward: rewardObj,
rewardFulfilled: message.payload.subscription.type === "channel.channel_points_automatic_reward_redemption.add"
|| message.payload.event.status.toLowerCase() === "fulfilled",
userId: message.payload.event.user_id,
username: message.payload.event.user_login,
displayName: message.payload.event.user_name,
customRewardId: message.payload.event.id,
redeemed_at: message.payload.event.redeemed_at,
};

comfyJS.onReward(
extra.displayName || extra.username,
rewardObj.title,
rewardObj.cost,
rewardObj.prompt || "",
extra,
);

break;
}
default:
break;
}
}

return () => clearObject.onDisconnect(false);
}

async function pubsubConnect( channel, password ) {
const heartbeatInterval = 1000 * 60; //ms between PING's
const reconnectInterval = 1000 * 3; //ms to wait before reconnect
Expand Down Expand Up @@ -218,8 +460,12 @@ var channelInfo = null;
var client = null;
var isFirstConnect = true;
var reconnectCount = 0;
// works as both a flag and a function to disconnect from eventsub
/** @type {(() => void) | boolean | undefined | null } */
var eventsubDisconnect = null;
var comfyJS = {
isDebug: false,
useEventSub: true,
chatModes: {},
version: function() {
return "@VERSION";
Expand Down Expand Up @@ -365,7 +611,7 @@ var comfyJS = {
GetClient: function() {
return client;
},
Init: function( username, password, channels, isDebug ) {
Init: function( username, password, channels, isDebug, useEventSub ) {
channels = channels || [ username ];
if( typeof channels === 'string' || channels instanceof String ) {
channels = [ channels ];
Expand All @@ -374,6 +620,8 @@ var comfyJS = {
throw new Error( "Channels is not an array" );
}
comfyJS.isDebug = isDebug;
eventsubDisconnect = null;
comfyJS.useEventSub = typeof useEventSub === "boolean" ? useEventSub : false;
mainChannel = channels[ 0 ];
var options = {
options: {
Expand Down Expand Up @@ -695,10 +943,34 @@ var comfyJS = {

// Setup PubSub (https://github.com/twitchdev/pubsub-javascript-sample)
if( password ) {
pubsubConnect( mainChannel, password );
if ( comfyJS.useEventSub ) {
eventSubConnectAsync( mainChannel, password )
.then( disconnect => {
if( typeof eventsubDisconnect !== "boolean" ) {
if( typeof disconnect !== "function" ) {
throw new Error( "EventSub connection failed" );
}
eventsubDisconnect = disconnect
return;
}
disconnect();
eventsubDisconnect = null;
})
.catch( (_) => {
pubsubConnect( mainChannel, password );
});
} else {
pubsubConnect( mainChannel, password );
}
}
},
Disconnect: function() {
if ( typeof eventsubDisconnect === "function" ) {
eventsubDisconnect();
eventsubDisconnect = null;
} else {
eventsubDisconnect = typeof eventsubDisconnect !== "undefined" ? true : null;
}
client.disconnect()
.catch( comfyJS.onError );
},
Expand Down
3 changes: 2 additions & 1 deletion types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ export interface ComfyJSInstance {
username: string,
password?: string,
channels?: string | string[],
isDebug?: boolean
isDebug?: boolean,
useEventSub?: boolean = true,
): void;
Disconnect(): void;

Expand Down