import { closure } from './utils'
import { CHANNEL_EVENTS, CHANNEL_STATES } from './constants'
import Push from './push'
import Timer from './timer'
/**
*
* @param {string} topic
* @param {(Object|function)} params
* @param {Socket} socket
*/
export default class Channel {
constructor(topic, params, socket) {
this.state = CHANNEL_STATES.closed
this.topic = topic
this.params = closure(params || {})
this.socket = socket
this.bindings = []
this.bindingRef = 0
this.timeout = this.socket.timeout
this.joinedOnce = false
this.joinPush = new Push(this, CHANNEL_EVENTS.join, this.params, this.timeout)
this.pushBuffer = []
this.stateChangeRefs = []
this.rejoinTimer = new Timer(() => {
if (this.socket.isConnected()) {
this.rejoin()
}
}, this.socket.rejoinAfterMs)
this.stateChangeRefs.push(this.socket.onError(() => this.rejoinTimer.reset()))
this.stateChangeRefs.push(
this.socket.onOpen(() => {
this.rejoinTimer.reset()
if (this.isErrored()) {
this.rejoin()
}
}),
)
this.joinPush.receive('ok', () => {
this.state = CHANNEL_STATES.joined
this.rejoinTimer.reset()
this.pushBuffer.forEach(pushEvent => pushEvent.send())
this.pushBuffer = []
})
this.joinPush.receive('error', () => {
this.state = CHANNEL_STATES.errored
if (this.socket.isConnected()) {
this.rejoinTimer.scheduleTimeout()
}
})
this.onClose(() => {
this.rejoinTimer.reset()
if (this.socket.hasLogger())
this.socket.log('channel', `close ${this.topic} ${this.joinRef()}`)
this.state = CHANNEL_STATES.closed
this.socket.remove(this)
})
this.onError((reason) => {
if (this.socket.hasLogger()) this.socket.log('channel', `error ${this.topic}`, reason)
if (this.isJoining()) {
this.joinPush.reset()
}
this.state = CHANNEL_STATES.errored
if (this.socket.isConnected()) {
this.rejoinTimer.scheduleTimeout()
}
})
this.joinPush.receive('timeout', () => {
if (this.socket.hasLogger())
this.socket.log(
'channel',
`timeout ${this.topic} (${this.joinRef()})`,
this.joinPush.timeout,
)
let leavePush = new Push(this, CHANNEL_EVENTS.leave, closure({}), this.timeout)
leavePush.send()
this.state = CHANNEL_STATES.errored
this.joinPush.reset()
if (this.socket.isConnected()) {
this.rejoinTimer.scheduleTimeout()
}
})
this.on(CHANNEL_EVENTS.reply, (payload, ref) => {
this.trigger(this.replyEventName(ref), payload)
})
}
/**
* Join the channel
* @param {integer} timeout
* @returns {Push}
*/
join(timeout = this.timeout) {
if (this.joinedOnce) {
throw new Error(
'tried to join multiple times. \'join\' can only be called a single time per channel instance',
)
} else {
this.timeout = timeout
this.joinedOnce = true
this.rejoin()
return this.joinPush
}
}
/**
* Hook into channel close
* @param {Function} callback
*/
onClose(callback) {
this.on(CHANNEL_EVENTS.close, callback)
}
/**
* Hook into channel errors
* @param {Function} callback
*/
onError(callback) {
return this.on(CHANNEL_EVENTS.error, reason => callback(reason))
}
/**
* Subscribes on channel events
*
* Subscription returns a ref counter, which can be used later to
* unsubscribe the exact event listener
*
* @example
* const ref1 = channel.on("event", do_stuff)
* const ref2 = channel.on("event", do_other_stuff)
* channel.off("event", ref1)
* // Since unsubscription, do_stuff won't fire,
* // while do_other_stuff will keep firing on the "event"
*
* @param {string} event
* @param {Function} callback
* @returns {integer} ref
*/
on(event, callback) {
let ref = this.bindingRef++
this.bindings.push({ event, ref, callback })
return ref
}
/**
* Unsubscribes off of channel events
*
* Use the ref returned from a channel.on() to unsubscribe one
* handler, or pass nothing for the ref to unsubscribe all
* handlers for the given event.
*
* @example
* // Unsubscribe the do_stuff handler
* const ref1 = channel.on("event", do_stuff)
* channel.off("event", ref1)
*
* // Unsubscribe all handlers from event
* channel.off("event")
*
* @param {string} event
* @param {integer} ref
*/
off(event, ref) {
this.bindings = this.bindings.filter((bind) => {
return !(bind.event === event && (typeof ref === 'undefined' || ref === bind.ref))
})
}
/**
* @private
*/
canPush() {
return this.socket.isConnected() && this.isJoined()
}
/**
* Sends a message `event` to server with the payload `payload`.
* Server receives this in the `handle_in(event, payload, socket)`
* function. if server replies or it times out (default 10000ms),
* then optionally the reply can be received.
*
* @example
* channel.push("event")
* .receive("ok", payload => console.log("server replied:", payload))
* .receive("error", err => console.log("server errored", err))
* .receive("timeout", () => console.log("timed out pushing"))
* @param {string} event
* @param {Object} payload
* @param {number} [timeout]
* @returns {Push}
*/
push(event, payload, timeout = this.timeout) {
payload = payload || {}
if (!this.joinedOnce) {
throw new Error(
`tried to push '${event}' to '${this.topic}' before joining. Use channel.join() before pushing events`,
)
}
let pushEvent = new Push(
this,
event,
function () {
return payload
},
timeout,
)
if (this.canPush()) {
pushEvent.send()
} else {
pushEvent.startTimeout()
this.pushBuffer.push(pushEvent)
}
return pushEvent
}
/** Leaves the channel
*
* Unsubscribes from server events, and
* instructs channel to terminate on server
*
* Triggers onClose() hooks
*
* To receive leave acknowledgements, use the `receive`
* hook to bind to the server ack, ie:
*
* @example
* channel.leave().receive("ok", () => alert("left!") )
*
* @param {integer} timeout
* @returns {Push}
*/
leave(timeout = this.timeout) {
this.rejoinTimer.reset()
this.joinPush.cancelTimeout()
this.state = CHANNEL_STATES.leaving
let onClose = () => {
if (this.socket.hasLogger()) this.socket.log('channel', `leave ${this.topic}`)
this.trigger(CHANNEL_EVENTS.close, 'leave')
}
let leavePush = new Push(this, CHANNEL_EVENTS.leave, closure({}), timeout)
leavePush.receive('ok', () => onClose()).receive('timeout', () => onClose())
leavePush.send()
if (!this.canPush()) {
leavePush.trigger('ok', {})
}
return leavePush
}
/**
* Overridable message hook
*
* Receives all events for specialized message handling
* before dispatching to the channel callbacks.
*
* Must return the payload, modified or unmodified
* @param {string} event
* @param {Object} payload
* @param {integer} ref
* @returns {Object}
*/
onMessage(_event, payload, _ref) {
return payload
}
/**
* @private
*/
isMember(topic, event, payload, joinRef) {
if (this.topic !== topic) {
return false
}
if (joinRef && joinRef !== this.joinRef()) {
if (this.socket.hasLogger())
this.socket.log('channel', 'dropping outdated message', {
topic,
event,
payload,
joinRef,
})
return false
} else {
return true
}
}
/**
* @private
*/
joinRef() {
return this.joinPush.ref
}
/**
* @private
*/
rejoin(timeout = this.timeout) {
if (this.isLeaving()) {
return
}
this.socket.leaveOpenTopic(this.topic)
this.state = CHANNEL_STATES.joining
this.joinPush.resend(timeout)
}
/**
* @private
*/
trigger(event, payload, ref, joinRef) {
let handledPayload = this.onMessage(event, payload, ref, joinRef)
if (payload && !handledPayload) {
throw new Error(
'channel onMessage callbacks must return the payload, modified or unmodified',
)
}
let eventBindings = this.bindings.filter(bind => bind.event === event)
for (let i = 0; i < eventBindings.length; i++) {
let bind = eventBindings[i]
bind.callback(handledPayload, ref, joinRef || this.joinRef())
}
}
/**
* @private
*/
replyEventName(ref) {
return `chan_reply_${ref}`
}
/**
* @private
*/
isClosed() {
return this.state === CHANNEL_STATES.closed
}
/**
* @private
*/
isErrored() {
return this.state === CHANNEL_STATES.errored
}
/**
* @private
*/
isJoined() {
return this.state === CHANNEL_STATES.joined
}
/**
* @private
*/
isJoining() {
return this.state === CHANNEL_STATES.joining
}
/**
* @private
*/
isLeaving() {
return this.state === CHANNEL_STATES.leaving
}
}