Join chat

Base methods for synchronization nodes. Client and server nodes are based on this module.

PropertyTypeDescription
connectionConnectionConnection to remote node.
logLogLogux log instance to be synchronized.
nodeIdstringUnique current machine name.
optionsobject?Synchronization options.
options.authauthCallback?Function to check client credentials.
options.credentialsobject?Client credentials. For example, access token.
options.fixTimeboolean?Detect difference between client and server and fix time in synchronized actions.
options.inFilterfilter?Function to filter actions from remote node. Best place for access control.
options.inMapmapper?Map function to change remote node’s action before put it to current log.
options.outFilterfilter?Filter function to select actions to synchronization.
options.outMapmapper?Map function to change action before sending it to remote client.
options.pingnumber?Milliseconds since last message to test connection by sending ping.
options.subprotocolstring?Application subprotocol version in SemVer format.
options.timeoutnumber?Timeout in milliseconds to wait answer before disconnect.

BaseNode#authenticated

Type: boolean.

Did we finish remote node authentication.

BaseNode#connected

Type: boolean.

Is synchronization in process.

node.on('disconnect', () => {
  node.connected //=> false
})

BaseNode#connection

Type: Connection.

Connection used to communicate to remote node.

BaseNode#lastReceived

Type: number.

Latest remote node’s log added time, which was successfully synchronized. It will be saves in log store.

BaseNode#lastSent

Type: number.

Latest current log added time, which was successfully synchronized. It will be saves in log store.

BaseNode#localNodeId

Type: string.

Unique current machine name.

console.log(node.localNodeId + ' is started')

BaseNode#localProtocol

Type: number.

Array with major and minor versions of used protocol.

if (tool.node.localProtocol !== 1) {
  throw new Error('Unsupported Logux protocol')
}

BaseNode#log

Type: Log.

Log for synchronization.

BaseNode#minProtocol

Type: number.

Minimum version of Logux protocol, which is supported.

console.log(`You need Logux protocol ${node.minProtocol} or higher`)

BaseNode#options

Type: object.

Synchronization options.

BaseNode#remoteNodeId

Type: string | undefined.

Unique name of remote machine. It is undefined until nodes handshake.

console.log('Connected to ' + node.remoteNodeId)

BaseNode#remoteProtocol

Type: number | undefined.

Array with major and minor versions of remote node protocol.

if (node.remoteProtocol >= 5) {
  useNewAPI()
} else {
  useOldAPI()
}

BaseNode#remoteSubprotocol

Type: string | undefined.

Remote node’s application subprotocol version in SemVer format.

It is undefined until nodes handshake. If remote node will not send on handshake its subprotocol, it will be set to 0.0.0.

if (semver.satisfies(node.remoteSubprotocol, '>= 5.0.0') {
  useNewAPI()
} else {
  useOldAPI()
}

BaseNode#state

Type: "disconnected" | "connecting" | "sending" | "synchronized".

Current synchronization state.

  • disconnected: no connection.
  • connecting: connection was started and we wait for node answer.
  • sending: new actions was sent, waiting for answer.
  • synchronized: all actions was synchronized and we keep connection.
node.on('state', () => {
  if (node.state === 'sending') {
    console.log('Do not close browser')
  }
})

BaseNode#catch(listener)

Disable throwing a error on error message and create error listener.

PropertyTypeDescription
listenererrorListenerThe listener function.
node.catch(error => {
  console.error(error)
})

BaseNode#destroy()

Shut down the connection and unsubscribe from log events.

connection.on('disconnect', () => {
  server.destroy()
})

BaseNode#on(event, listener)

Subscribe for synchronization events. It implements nanoevents API. Supported events:

  • state: synchronization state was changed.
  • connect: custom check before node authentication. You can throw a LoguxError to send error to remote node.
  • error: synchronization error was raised.
  • clientError: when error was sent to remote node.
  • debug: when debug information received from remote node.
PropertyTypeDescription
event"state" | "connect" | "error" | "clientError" | "debug"Event name.
listenerlistenerThe listener function.

Returns function. Unbind listener from event.

node.on('clientError', error => {
  logError(error)
})

BaseNode#waitFor(state)

Return Promise until sync will have specific state.

If current state is correct, method will return resolved Promise.

PropertyTypeDescription
statestringThe expected synchronization state value.

Returns Promise. Promise until specific state.

await node.waitFor('synchronized')
console.log('Everything is synchronized')

Subscription context.

Extends Context.

server.channel('user/:id', {
  access (ctx, action, meta) {
    return ctx.params.id === ctx.userId
  }
})

ChannelContext#clientId

Type: string.

Unique persistence client ID.

server.clientIds[node.clientId]

ChannelContext#data

Type: object.

Open structure to save some data between different steps of processing.

server.type('RENAME', {
  access (ctx, action, meta) {
    ctx.data.user = findUser(ctx.userId)
    return ctx.data.user.hasAccess(action.projectId)
  }
  process (ctx, action, meta) {
    return ctx.data.user.rename(action.projectId, action.name)
  }
})

ChannelContext#isServer

Type: boolean.

Was action created by Logux server.

access: (ctx, action, meta) => ctx.isServer

ChannelContext#nodeId

Type: string.

Unique node ID.

server.nodeIds[node.nodeId]

ChannelContext#params

Type: object.

Parsed variable parts of channel name.

server.channel('user/:id', {
  access (ctx, action, meta) {
    action.channel //=> user/10
    ctx.params //=> { id: '10' }
  }
})
server.channel(/post\/(\d+)/, {
  access (ctx, action, meta) {
    action.channel //=> post/10
    ctx.params //=> ['post/10', '10']
  }
})

ChannelContext#subprotocol

Type: string | undefined.

Action creator application subprotocol version in SemVer format. Use Context#isSubprotocol to check it.

ChannelContext#userId

Type: string | undefined.

User ID taken node ID.

async access (ctx, action, meta) {
  const user = await db.getUser(ctx.userId)
  return user.admin
}

ChannelContext#isSubprotocol(range)

Check creator subprotocol version. It uses semver npm package to parse requirements.

PropertyTypeDescription
rangestringnpm’s version requirements.

Returns boolean. Is version satisfies requirements.

if (ctx.isSubprotocol('2.x')) {
  useOldAPI()
}

ChannelContext#sendBack(action, meta = {})

Send action back to the client.

PropertyTypeDescription
actionActionThe action.
metaMeta?Action’s meta.

Returns Promise. Promise until action was added to the server log.

ctx.sendBack({ type: 'login/success', token })

Abstract interface for connection to synchronize logs over it. For example, WebSocket or Loopback.

Connection#connected

Type: boolean.

Is connection is enabled.

Connection#connect()

Start connection. Connection should be in disconnected state from the beginning and start connection only on this method call.

This method could be called again if connection moved to disconnected state.

Returns Promise. Promise until connection will be established.

Connection#disconnect(reason?)

Finish current connection.

After disconnection, connection could be started again by Connection#connect.

PropertyTypeDescription
reason("error" | "timeout" | "destroy")?Disconnection reason. It will not be sent to other machine.

Connection#on(event, listener)

Subscribe for connection events. It implements nanoevents API. Supported events:

  • connecting: connection establishing was started.
  • connect: connection was established by any side.
  • disconnect: connection was closed by any side.
  • message: message was receive from remote node.
  • error: error during connection, sending or receiving.
PropertyTypeDescription
event"connecting" | "connect" | "disconnect" | "message" | "error"Event.
listenerfunctionThe listener function.

Returns function. Unbind listener from event.

Connection#send(message)

Send message to connection.

PropertyTypeDescription
messageMessageThe message to be sent.

Action context.

server.type('FOO', {
  access (ctx, action, meta) {
    return ctx.isSubprotocol('3.x') ? check3(action) : check4(action)
  }
})

Context#clientId

Type: string.

Unique persistence client ID.

server.clientIds[node.clientId]

Context#data

Type: object.

Open structure to save some data between different steps of processing.

server.type('RENAME', {
  access (ctx, action, meta) {
    ctx.data.user = findUser(ctx.userId)
    return ctx.data.user.hasAccess(action.projectId)
  }
  process (ctx, action, meta) {
    return ctx.data.user.rename(action.projectId, action.name)
  }
})

Context#isServer

Type: boolean.

Was action created by Logux server.

access: (ctx, action, meta) => ctx.isServer

Context#nodeId

Type: string.

Unique node ID.

server.nodeIds[node.nodeId]

Context#subprotocol

Type: string | undefined.

Action creator application subprotocol version in SemVer format. Use Context#isSubprotocol to check it.

Context#userId

Type: string | undefined.

User ID taken node ID.

async access (ctx, action, meta) {
  const user = await db.getUser(ctx.userId)
  return user.admin
}

Context#isSubprotocol(range)

Check creator subprotocol version. It uses semver npm package to parse requirements.

PropertyTypeDescription
rangestringnpm’s version requirements.

Returns boolean. Is version satisfies requirements.

if (ctx.isSubprotocol('2.x')) {
  useOldAPI()
}

Context#sendBack(action, meta = {})

Send action back to the client.

PropertyTypeDescription
actionActionThe action.
metaMeta?Action’s meta.

Returns Promise. Promise until action was added to the server log.

ctx.sendBack({ type: 'login/success', token })

Two paired loopback connections.

PropertyTypeDescription
delaynumber?Delay for connection and send events.
import { LocalPair } from 'logux-core''
const pair = new LocalPair()
const client = new ClientNode(pair.left)
const server = new ServerNode(pair.right)

LocalPair#delay

Type: number.

Delay for connection and send events to emulate real connection latency.

LocalPair#left

Type: Connection.

First connection. Will be connected to LocalPair#right one after Connection#connect.

new ClientNode(pair.left)

LocalPair#right

Type: Connection.

Second connection. Will be connected to LocalPair#left one after Connection#connect.

new ServerNode(pair.right)

Stores actions with time marks. Log is main idea in Logux. In most end-user tools you will work with log and should know log API.

PropertyTypeDescription
optsobjectOptions.
opts.nodeIdstring | numberUnique current machine name.
opts.storeStoreStore for log.
import Log from 'logux-core/log'
const log = new Log({
  store: new MemoryStore(),
  nodeId: 'client:134'
})

log.on('add', beeper)
log.add({ type: 'beep' })

Log#nodeId

Type: string | number.

Unique node ID. It is used in action IDs.

Log#add(action, meta?)

Add action to log.

It will set id, time (if they was missed) and added property to meta and call all listeners.

PropertyTypeDescription
actionActionThe new action.
metaMeta?Open structure for action metadata.

Returns Promise<Meta | false>. Promise with meta if action was added to log or false if action was already in log.

removeButton.addEventListener('click', () => {
  log.add({ type: 'users:remove', user: id })
})

Log#byId(id)

Does log already has action with this ID.

PropertyTypeDescription
idstringAction ID.

Returns Promise<Entry | Nope>. Promise with entry.

if (action.type === 'logux/undo') {
  const [undidAction, undidMeta] = await log.byId(action.id)
  log.changeMeta(meta.id, { reasons: undidMeta.reasons })
}

Log#changeMeta(id, diff)

Change action metadata. You will remove action by setting reasons: [].

PropertyTypeDescription
diffobjectObject with values to change in action metadata.
idstringAction ID.

Returns Promise<boolean>. Promise with true if metadata was changed or false on unknown ID.

await process(action)
log.changeMeta(action, { status: 'processed' })

Log#each(opts?, callback)

Iterates through all actions, from last to first.

Return false from callback if you want to stop iteration.

PropertyTypeDescription
callbackiteratorFunction will be executed on every action.
optsobject?Iterator options.

Returns Promise. When iteration will be finished by iterator or end of actions.

log.each((action, meta) => {
  if (compareTime(meta.id, lastBeep) <= 0) {
    return false;
  } else if (action.type === 'beep') {
    beep()
    lastBeep = meta.id
    return false;
  }
})

Log#generateId()

Generate next unique action ID.

Returns string. Unique action ID.

const id = log.generateId()

Log#on(event, listener)

Subscribe for log events. It implements nanoevents API. Supported events:

  • preadd: when somebody try to add action to log. It fires before ID check. The best place to add reason.
  • add: when new action was added to log.
  • clean: when action was cleaned from store.
PropertyTypeDescription
event"preadd" | "add" | "clean"The event name.
listenerlistenerThe listener function.

Returns function. Unbind listener from event.

const unbind = log.on('add', (action, meta) => {
  if (action.type === 'beep') beep()
})
function disableBeeps () {
  unbind()
}

Log#removeReason(reason, criteria?)

Remove reason tag from action’s metadata and remove actions without reason from log.

PropertyTypeDescription
criteriaobject?Actions criteria.
reasonstringReason’s name.

Returns Promise. Promise when cleaning will be finished.

onSync(lastSent) {
  log.removeReason('unsynchronized', { maxAdded: lastSent })
}

Logux error in logs synchronization.

PropertyTypeDescription
optionsanyThe error option.
receivedboolean?Was error received from remote node.
typestringThe error code.
if (error.name === 'LoguxError') {
  console.log('Server throws: ' + error.description)
}

LoguxError.describe(type, options)

Return a error description by it code.

PropertyTypeDescription
optionsanyThe errors options depends on error code.
typestringThe error code.

Returns string. Human-readable error description.

errorMessage(msg) {
  console.log(LoguxError.describe(msg[1], msg[2]))
}

LoguxError#description

Type: string.

Human-readable error description.

console.log('Server throws: ' + error.description)

LoguxError#name

Type: string.

Always equal to LoguxError. The best way to check error class.

if (error.name === 'LoguxError') { }

LoguxError#options

Type: any.

Error options depends on error type.

if (error.type === 'timeout') {
  console.error('A timeout was reached (' + error.options + ' ms)')
}

LoguxError#received

Type: boolean.

Was error received from remote client.

LoguxError#type

Type: string.

The error code.

if (error.type === 'timeout') {
  fixNetwork()
}

Simple memory-based log store.

It is good for tests, but not for server or client usage, because it store all data in memory and will lose log on exit.

Extends Store.

import { MemoryStore } from 'logux-core'

var log = new Log({
  nodeId: 'server',
  store: new MemoryStore()
})

MemoryStore#add(action, meta)

Add action to store. Action always will have type property.

PropertyTypeDescription
actionActionThe action to add.
metaMetaAction’s metadata.

Returns Promise<Meta | false>. Promise with meta for new action or false if action with same meta.id was already in store.

MemoryStore#byId(id)

Return action by action ID.

PropertyTypeDescription
idstringAction ID.

Returns Promise<Entry | Nope>. Promise with array of action and metadata.

MemoryStore#changeMeta(id, diff)

Change action metadata.

PropertyTypeDescription
diffobjectObject with values to change in action metadata.
idstringAction ID.

Returns Promise<boolean>. Promise with true if metadata was changed or false on unknown ID.

MemoryStore#clean()

Remove all data from the store.

Returns Promise. Promise when cleaning will be finished.

MemoryStore#get(opts)

Return a Promise with first Page. Page object has entries property with part of actions and next property with function to load next page. If it was a last page, next property should be empty.

This tricky API is used, because log could be very big. So we need pagination to keep them in memory.

PropertyTypeDescription
optsobjectQuery options.

Returns Promise<Page>. Promise with first Page.

MemoryStore#getLastAdded()

Return biggest added number in store. All actions in this log have less or same added time.

Returns Promise<number>. Promise with biggest added number.

MemoryStore#getLastSynced()

Get added values for latest synchronized received/sent events.

Returns Promise<number>. Promise with LastSynced.

MemoryStore#remove(id)

Remove action from store.

PropertyTypeDescription
idstringAction ID.

Returns Promise<Entry | false>. Promise with entry if action was in store.

MemoryStore#removeReason(reason, criteria, callback)

Remove reason from action’s metadata and remove actions without reasons.

PropertyTypeDescription
callbacklistenerCallback for every removed action.
criteriaobjectActions criteria.
reasonstringThe reason name.

Returns Promise. Promise when cleaning will be finished.

MemoryStore#setLastSynced(values)

Set added value for latest synchronized received or/and sent events.

PropertyTypeDescription
valuesLastSyncedObject with latest sent or received values.

Returns Promise. Promise when values will be saved to store.


End-user API to create Logux server.

PropertyTypeDescription
optsobjectServer options.
opts.backendstring?URL to PHP, Ruby on Rails, or other backend to process actions and authentication.
opts.bunyanBunyanLogger?Bunyan logger with custom settings.
opts.certstring?SSL certificate or path to it. Path could be relative from server root. It is required in production mode, because WSS is highly recommended.
opts.controlHostnumber?Host to bind HTTP server to control Logux server.
opts.controlPasswordstring?Password to control the server.
opts.controlPortnumber?Port to control the server.
opts.env("production" | "development")?Development or production server mode. By default, it will be taken from NODE_ENV environment variable. On empty NODE_ENV it will be "development".
opts.hoststring?IP-address to bind server.
opts.keystring?SSL key or path to it. Path could be relative from server root. It is required in production mode, because WSS is highly recommended.
opts.pingnumber?Milliseconds since last message to test connection by sending ping.
opts.portnumber?Port to bind server. It will create HTTP server manually to connect WebSocket server to it.
opts.redisstring?URL to Redis for Logux Server Pro scaling.
opts.reporter("human" | "json" | function)?Report process/errors to CLI in bunyan JSON or in human readable format. It can be also a function to show current server status.
opts.rootstring?Application root to load files and show errors.
opts.serverhttp.Server?HTTP server to connect WebSocket server to it. Same as in ws.Server.
opts.storeStore?Store to save log. Will be MemoryStore, by default.
opts.subprotocolstringServer current application subprotocol version in SemVer format.
opts.supportsstringnpm’s version requirements for client subprotocol version.
opts.timeoutnumber?Timeout in milliseconds to disconnect connection.
const { Server } = require('@logux/server')

const env = process.env.NODE_ENV || 'development'
const envOptions = {}
if (env === 'production') {
  envOptions.cert = 'cert.pem'
  envOptions.key = 'key.pem'
}

const server = new Server(Object.assign({
  subprotocol: '1.0.0',
  supports: '1.x || 0.x',
  root: __dirname
}, envOptions))

server.listen()

Server.loadOptions(process, defaults)

Load options from command-line arguments and/or environment

PropertyTypeDescription
defaultsobjectDefault server options. Arguments and environment variables will override them.
processobjectCurrent process object.

Returns object. Parsed options object.

const server = new Server(Server.loadOptions(process, {
  subprotocol: '1.0.0',
  supports: '1.x',
  root: __dirname,
  port: 31337
}))

Server#connected

Type: ServerClient[].

Connected clients.

for (let i in server.connected) {
  console.log(server.connected[i].remoteAddress)
}

Server#env

Type: "production" | "development".

Production or development mode.

if (server.env === 'development') {
  logDebugData()
}

Server#log

Type: Log.

Server actions log.

server.log.each(finder)

Server#nodeId

Type: string.

Server unique ID.

console.log('Error was raised on ' + server.nodeId)

Server#options

Type: object.

Server options.

console.log('Server options', server.options.subprotocol)

Server#addClient(connection)

Add new client for server. You should call this method manually mostly for test purposes.

PropertyTypeDescription
connectionConnectionLogux connection to client.

Returns number. Client ID,

server.addClient(test.right)

Server#auth(authenticator)

Set authenticate function. It will receive client credentials and node ID. It should return a Promise with true or false.

PropertyTypeDescription
authenticatorauthenticatorThe authentication callback.
server.auth(async (userId, token) => {
  const user = await findUserByToken(token)
  return !!user && userId === user.id
})

Server#channel(pattern, callbacks)

Define the channel.

PropertyTypeDescription
callbacksobjectCallback during subscription process.
patternstring | RegExpPattern or regular expression for channel name.
server.channel('user/:id', {
  access (ctx, action, meta) {
    return ctx.params.id === ctx.userId
  }
  filter (ctx, action, meta) {
    return (otherCtx, otherAction, otherMeta) => {
      return !action.hidden
    }
  }
  async init (ctx, action, meta) {
    const user = await db.loadUser(ctx.params.id)
    ctx.sendBack({ type: 'USER_NAME', name: user.name })
  }
})

Server#debugError(error)

Send runtime error stacktrace to all clients.

PropertyTypeDescription
errorErrorRuntime error instance.
process.on('uncaughtException', e => {
  server.debugError(e)
})

Server#destroy()

Stop server and unbind all listeners.

Returns Promise. Promise when all listeners will be removed.

afterEach(() => {
  testServer.destroy()
})

Server#listen()

Start WebSocket server and listen for clients.

Returns Promise. When the server has been bound.

Server#on(event, listener)

Subscribe for synchronization events. It implements nanoevents API. Supported events:

  • error: server error during action processing.
  • fatal: server error during loading.
  • clientError: wrong client behaviour.
  • connected: new client was connected.
  • disconnected: client was disconnected.
  • preadd: action is going to be added to the log. The best place to set reasons.
  • add: action was added to the log.
  • clean: action was cleaned from the log.
  • processed: action processing was finished.
  • subscribed: channel initial data was loaded.
PropertyTypeDescription
eventstringThe event name.
listenerlistenerThe listener function.

Returns function. Unbind listener from event.

server.on('error', error => {
  trackError(error)
})

Server#otherChannel(callbacks)

Set callbacks for unknown channel subscription.

PropertyTypeDescription
callbacksobjectCallback during subscription process.
server.otherChannel({
  async access (ctx, action, meta) {
    const res = await phpBackend.checkChannel(ctx.params[0], ctx.userId)
    if (res.code === 404) {
      this.wrongChannel(action, meta)
      return false
    } else {
      return response.body === 'granted'
    }
  }
})

Server#otherType(callbacks)

Define callbacks for actions, which type was not defined by any Server#type. Useful for proxy or some hacks.

Without this settings, server will call Server#unknownType on unknown type.

PropertyTypeDescription
callbacksobjectCallbacks for actions with this type.
server.otherType(
  async access (ctx, action, meta) {
    const response = await phpBackend.checkByHTTP(action, meta)
    if (response.code === 404) {
      this.unknownType(action, meta)
      retur false
    } else {
      return response.body === 'granted'
    }
  }
  async process (ctx, action, meta) {
    return await phpBackend.sendHTTP(action, meta)
  }
})

Server#sendAction(action, meta)

Send action, received by other server, to all clients of current server. This method is for multi-server configuration only.

PropertyTypeDescription
actionActionNew action.
metaMetaAction’s metadata.
server.on('add', (action, meta) => {
  if (meta.server === server.nodeId) {
    sendToOtherServers(action, meta)
  }
})
onReceivingFromOtherServer((action, meta) => {
  server.sendAction(action, meta)
})

Server#type(name, callbacks)

Define action type’s callbacks.

PropertyTypeDescription
callbacksobjectCallbacks for actions with this type.
namestringThe action’s type.
server.type('CHANGE_NAME', {
  access (ctx, action, meta) {
    return action.user === ctx.userId
  },
  resend (ctx, action) {
    return { channel: `user/${ action.user }` }
  }
  process (ctx, action, meta) {
    if (isFirstOlder(lastNameChange(action.user), meta)) {
      return db.changeUserName({ id: action.user, name: action.name })
    }
  }
})

Server#undo(meta, reason, extra = {})

Undo action from client.

PropertyTypeDescription
extraobject?Extra fields to logux/undo action.
metaMetaThe action’s metadata.
reasonstringOptional code for reason.

Returns Promise. When action was saved to the log.

if (couldNotFixConflict(action, meta)) {
  server.undo(meta)
}

Server#unknownType(action, meta)

If you receive action with unknown type, this method will mark this action with error status and undo it on the clients.

If you didn’t set Server#otherType, Logux will call it automatically.

PropertyTypeDescription
actionActionThe action with unknown type.
metaMetaAction’s metadata.
server.otherType({
  access (ctx, action, meta) {
    if (action.type.startsWith('myapp/')) {
      return proxy.access(action, meta)
    } else {
      server.unknownType(action, meta)
    }
  }
})

Server#wrongChannel(action, meta)

Report that client try to subscribe for unknown channel.

Logux call it automatically, if you will not set Server#otherChannel.

PropertyTypeDescription
actionActionThe subscribe action.
metaMetaAction’s metadata.
server.otherChannel({
  async access (ctx, action, meta) {
    const res = phpBackend.checkChannel(params[0], ctx.userId)
    if (res.code === 404) {
      this.wrongChannel(action, meta)
      return false
    } else {
      return response.body === 'granted'
    }
  }
})

Logux client connected to server.

PropertyTypeDescription
appServerThe server.
connectionServerConnectionThe Logux connection.
keynumberClient number used as app.connected key.
const client = server.connected[0]

ServerClient#clientId

Type: string | undefined.

Unique persistence machine ID. It will be undefined before correct authentication.

ServerClient#connection

Type: ServerConnection.

The Logux wrapper to WebSocket connection.

console.log(client.connection.ws.upgradeReq.headers)

ServerClient#key

Type: string.

Client number used as app.connected key.

function stillConnected (client) {
  return !!app.connected[client.key]
}

ServerClient#node

Type: ServerNode.

Node instance to synchronize logs.

if (client.node.state === 'synchronized')

ServerClient#nodeId

Type: string | undefined.

Unique node ID. It will be undefined before correct authentication.

ServerClient#processing

Type: boolean.

Does server process some action from client.

console.log('Clients in processing:', clients.map(i => i.processing))

ServerClient#remoteAddress

Type: string.

Client IP address.

const clientCity = detectLocation(client.remoteAddress)

ServerClient#userId

Type: string | undefined.

User ID. It will be filled from client’s node ID. It will be undefined before correct authentication.

ServerClient#destroy()

Disconnect client.

ServerClient#isSubprotocol(range)

Check client subprotocol version. It uses semver npm package to parse requirements.

PropertyTypeDescription
rangestringnpm’s version requirements.

Returns boolean. Is version satisfies requirements.

if (client.isSubprotocol('4.x')) {
  useOldAPI()
}

Logux connection for server WebSocket.

PropertyTypeDescription
wsWebSocketWebSocket instance
import { ServerConnection } from 'logux-core'
import { Server } from 'ws'

wss.on('connection', function connection(ws) {
  const connection = new ServerConnection(ws)
  const node = new ServerNode('server', log, connection, opts)
})

Server node in synchronization pair.

Instead of client node, it doesn’t initialize synchronization and destroy itself on disconnect.

Extends BaseNode.

PropertyTypeDescription
connectionConnectionConnection to remote node.
logLogLogux log instance to be synchronized.
nodeIdstringUnique current machine name.
optionsobject?Synchronization options.
options.authauthCallback?Function to check client credentials.
options.credentialsobject?Server credentials. For example, access token.
options.inFilterfilter?Function to filter actions from client. Best place for permissions control.
options.inMapmapper?Map function to change remote node’s action before put it to current log.
options.outFilterfilter?Filter function to select actions to synchronization.
options.outMapmapper?Map function to change action before sending it to remote client.
options.pingnumber?Milliseconds since last message to test connection by sending ping.
options.subprotocolstring?Application subprotocol version in SemVer format.
options.timeoutnumber?Timeout in milliseconds to wait answer before disconnect.
import { ServerNode } from 'logux-core'
startServer(ws => {
  const connection = new ServerConnection(ws)
  const node = new ServerNode('server' + id, log, connection)
})

ServerNode#authenticated

Type: boolean.

Did we finish remote node authentication.

ServerNode#connected

Type: boolean.

Is synchronization in process.

node.on('disconnect', () => {
  node.connected //=> false
})

ServerNode#connection

Type: Connection.

Connection used to communicate to remote node.

ServerNode#lastReceived

Type: number.

Latest remote node’s log added time, which was successfully synchronized. It will be saves in log store.

ServerNode#lastSent

Type: number.

Latest current log added time, which was successfully synchronized. It will be saves in log store.

ServerNode#localNodeId

Type: string.

Unique current machine name.

console.log(node.localNodeId + ' is started')

ServerNode#localProtocol

Type: number.

Array with major and minor versions of used protocol.

if (tool.node.localProtocol !== 1) {
  throw new Error('Unsupported Logux protocol')
}

ServerNode#log

Type: Log.

Log for synchronization.

ServerNode#minProtocol

Type: number.

Minimum version of Logux protocol, which is supported.

console.log(`You need Logux protocol ${node.minProtocol} or higher`)

ServerNode#options

Type: object.

Synchronization options.

ServerNode#remoteNodeId

Type: string | undefined.

Unique name of remote machine. It is undefined until nodes handshake.

console.log('Connected to ' + node.remoteNodeId)

ServerNode#remoteProtocol

Type: number | undefined.

Array with major and minor versions of remote node protocol.

if (node.remoteProtocol >= 5) {
  useNewAPI()
} else {
  useOldAPI()
}

ServerNode#remoteSubprotocol

Type: string | undefined.

Remote node’s application subprotocol version in SemVer format.

It is undefined until nodes handshake. If remote node will not send on handshake its subprotocol, it will be set to 0.0.0.

if (semver.satisfies(node.remoteSubprotocol, '>= 5.0.0') {
  useNewAPI()
} else {
  useOldAPI()
}

ServerNode#state

Type: "disconnected" | "connecting" | "sending" | "synchronized".

Current synchronization state.

  • disconnected: no connection.
  • connecting: connection was started and we wait for node answer.
  • sending: new actions was sent, waiting for answer.
  • synchronized: all actions was synchronized and we keep connection.
node.on('state', () => {
  if (node.state === 'sending') {
    console.log('Do not close browser')
  }
})

ServerNode#catch(listener)

Disable throwing a error on error message and create error listener.

PropertyTypeDescription
listenererrorListenerThe listener function.
node.catch(error => {
  console.error(error)
})

ServerNode#destroy()

Shut down the connection and unsubscribe from log events.

connection.on('disconnect', () => {
  server.destroy()
})

ServerNode#on(event, listener)

Subscribe for synchronization events. It implements nanoevents API. Supported events:

  • state: synchronization state was changed.
  • connect: custom check before node authentication. You can throw a LoguxError to send error to remote node.
  • error: synchronization error was raised.
  • clientError: when error was sent to remote node.
  • debug: when debug information received from remote node.
PropertyTypeDescription
event"state" | "connect" | "error" | "clientError" | "debug"Event name.
listenerlistenerThe listener function.

Returns function. Unbind listener from event.

node.on('clientError', error => {
  logError(error)
})

ServerNode#waitFor(state)

Return Promise until sync will have specific state.

If current state is correct, method will return resolved Promise.

PropertyTypeDescription
statestringThe expected synchronization state value.

Returns Promise. Promise until specific state.

await node.waitFor('synchronized')
console.log('Everything is synchronized')

Every Store class should provide 8 standard methods: add, has, get, remove, changeMeta, removeReason, getLastAdded, getLastSynced, setLastSynced.

See MemoryStore sources for example.

Store#add(action, meta)

Add action to store. Action always will have type property.

PropertyTypeDescription
actionActionThe action to add.
metaMetaAction’s metadata.

Returns Promise<Meta | false>. Promise with meta for new action or false if action with same meta.id was already in store.

Store#byId(id)

Return action by action ID.

PropertyTypeDescription
idstringAction ID.

Returns Promise<Entry | Nope>. Promise with array of action and metadata.

Store#changeMeta(diff, id)

Change action metadata.

PropertyTypeDescription
diffobjectObject with values to change in action metadata.
idstringAction ID.

Returns Promise<boolean>. Promise with true if metadata was changed or false on unknown ID.

Store#clean()

Remove all data from the store.

Returns Promise. Promise when cleaning will be finished.

Store#get(opts)

Return a Promise with first Page. Page object has entries property with part of actions and next property with function to load next page. If it was a last page, next property should be empty.

This tricky API is used, because log could be very big. So we need pagination to keep them in memory.

PropertyTypeDescription
optsobjectQuery options.

Returns Promise<Page>. Promise with first Page.

Store#getLastAdded()

Return biggest added number in store. All actions in this log have less or same added time.

Returns Promise<number>. Promise with biggest added number.

Store#getLastSynced()

Get added values for latest synchronized received/sent events.

Returns Promise<number>. Promise with LastSynced.

Store#remove(id)

Remove action from store.

PropertyTypeDescription
idstringAction ID.

Returns Promise<Entry | false>. Promise with entry if action was in store.

Store#removeReason(callback, criteria, reason)

Remove reason from action’s metadata and remove actions without reasons.

PropertyTypeDescription
callbacklistenerCallback for every removed action.
criteriaobjectActions criteria.
reasonstringThe reason name.

Returns Promise. Promise when cleaning will be finished.

Store#setLastSynced(values)

Set added value for latest synchronized received or/and sent events.

PropertyTypeDescription
valuesLastSyncedObject with latest sent or received values.

Returns Promise. Promise when values will be saved to store.


Log to be used in tests. It already has memory store, node ID, and special test timer.

Use TestTime to create test log.

Extends Log.

PropertyTypeDescription
idnumberLog sequence number created from this test time.
optsobject?Options.
opts.nodeId(string | number)?Unique current machine name.
opts.storeStore?Store for log. Will use MemoryStore by default.
timeTestTimeThis test time.
import { TestTime } from 'logux-core'

it('tests log', () => {
  const log = TestTime.getLog()
})

it('tests 2 logs', () => {
  const time = new TestTime()
  const log1 = time.nextLog()
  const log2 = time.nextLog()
})

TestLog#nodeId

Type: string | number.

Unique node ID. It is used in action IDs.

TestLog#actions()

Return all action (without metadata) inside log, sorted by created time.

This shortcut works only with MemoryStore. To use it, do not change store type by store option in TestTime.getLog.

Returns Action[]. Log’s action.

expect(log.action).toEqual([
  { type: 'A' }
])

TestLog#add(action, meta?)

Add action to log.

It will set id, time (if they was missed) and added property to meta and call all listeners.

PropertyTypeDescription
actionActionThe new action.
metaMeta?Open structure for action metadata.

Returns Promise<Meta | false>. Promise with meta if action was added to log or false if action was already in log.

removeButton.addEventListener('click', () => {
  log.add({ type: 'users:remove', user: id })
})

TestLog#byId(id)

Does log already has action with this ID.

PropertyTypeDescription
idstringAction ID.

Returns Promise<Entry | Nope>. Promise with entry.

if (action.type === 'logux/undo') {
  const [undidAction, undidMeta] = await log.byId(action.id)
  log.changeMeta(meta.id, { reasons: undidMeta.reasons })
}

TestLog#changeMeta(diff, id)

Change action metadata. You will remove action by setting reasons: [].

PropertyTypeDescription
diffobjectObject with values to change in action metadata.
idstringAction ID.

Returns Promise<boolean>. Promise with true if metadata was changed or false on unknown ID.

await process(action)
log.changeMeta(action, { status: 'processed' })

TestLog#each(callback, opts?)

Iterates through all actions, from last to first.

Return false from callback if you want to stop iteration.

PropertyTypeDescription
callbackiteratorFunction will be executed on every action.
optsobject?Iterator options.

Returns Promise. When iteration will be finished by iterator or end of actions.

log.each((action, meta) => {
  if (compareTime(meta.id, lastBeep) <= 0) {
    return false;
  } else if (action.type === 'beep') {
    beep()
    lastBeep = meta.id
    return false;
  }
})

TestLog#entries()

Return all entries (with metadata) inside log, sorted by created time.

This shortcut works only with MemoryStore. To use it, do not change store type by store option in TestTime.getLog.

Returns Entry[]. Log’s entries.

expect(log.action).toEqual([
  [{ type: 'A' }, { id: '1 test1 0', time: 1, added: 1, reasons: ['t'] }]
])

TestLog#generateId()

Generate next unique action ID.

Returns string. Unique action ID.

const id = log.generateId()

TestLog#on(event, listener)

Subscribe for log events. It implements nanoevents API. Supported events:

  • preadd: when somebody try to add action to log. It fires before ID check. The best place to add reason.
  • add: when new action was added to log.
  • clean: when action was cleaned from store.
PropertyTypeDescription
event"preadd" | "add" | "clean"The event name.
listenerlistenerThe listener function.

Returns function. Unbind listener from event.

const unbind = log.on('add', (action, meta) => {
  if (action.type === 'beep') beep()
})
function disableBeeps () {
  unbind()
}

TestLog#removeReason(criteria?, reason)

Remove reason tag from action’s metadata and remove actions without reason from log.

PropertyTypeDescription
criteriaobject?Actions criteria.
reasonstringReason’s name.

Returns Promise. Promise when cleaning will be finished.

onSync(lastSent) {
  log.removeReason('unsynchronized', { maxAdded: lastSent })
}

Two paired loopback connections with events tracking to be used in Logux tests.

Extends LocalPair.

PropertyTypeDescription
delaynumber?Delay for connection and send events.
import { testPair } from 'logux-core'
it('tracks events', async () => {
  const pair = new testPair()
  const client = new ClientNode(pair.right)
  await pair.left.connect()
  expect(pair.leftEvents).toEqual('connect')
  await pair.left.send(msg)
  expect(pair.leftSent).toEqual([msg])
})

TestPair#delay

Type: number.

Delay for connection and send events to emulate real connection latency.

TestPair#left

Type: Connection.

First connection. Will be connected to LocalPair#right one after Connection#connect.

new ClientNode(pair.left)

TestPair#leftEvents

Type: Array[].

Emitted events from TestPair#left connection.

await pair.left.connect()
pair.leftEvents //=> [['connect']]

TestPair#leftNode

Type: BaseNode.

Node instance used in this test, connected with TestPair#left.

function createTest () {
  test = new TestPair()
  test.leftNode = ClientNode('client', log, test.left)
  return test
}

TestPair#leftSent

Type: Message[].

Sent messages from TestPair#left connection.

await pair.left.send(msg)
pair.leftSent //=> [msg]

TestPair#right

Type: Connection.

Second connection. Will be connected to LocalPair#left one after Connection#connect.

new ServerNode(pair.right)

TestPair#rightEvents

Type: Array[].

Emitted events from TestPair#right connection.

await pair.right.connect()
pair.rightEvents //=> [['connect']]

TestPair#rightNode

Type: BaseNode.

Node instance used in this test, connected with TestPair#right.

function createTest () {
  test = new TestPair()
  test.rightNode = ServerNode('client', log, test.right)
  return test
}

TestPair#rightSent

Type: Message[].

Sent messages from TestPair#right connection.

pair.right.send(msg)
pair.rightSent //=> [msg]

TestPair#clear()

Clear all connections events and messages to test only last events.

await client.connection.connect()
pair.clear() // Remove all connecting messages
await client.log.add({ type: 'a' })
expect(pair.leftSent).toEqual([
  ['sync', …]
])

TestPair#wait(receiver?)

Return Promise until next event.

PropertyTypeDescription
receiver("left" | "right")?Wait for specific receiver event.

Returns Promise. Promise until next event.

pair.left.send(['test'])
await pair.wait('left')
pair.leftSend //=> [['test']]

Creates special logs for test purposes.

Real logs use real time in actions ID, so log content will be different on every test execution.

To fix it Logux has special logs for tests with simple sequence timer. All logs from one test should share same time. This is why you should use log creator to share time between all logs in one test.

import { TestTime } from 'logux-core'

it('tests log', () => {
  const log = TestTime.getLog()
})

it('tests 2 logs', () => {
  const time = new TestTime()
  const log1 = time.nextLog()
  const log2 = time.nextLog()
})

TestTime.getLog(opts?)

Shortcut to create time and generate single log. Use it only if you need one log in test.

PropertyTypeDescription
optsobject?Log options.

Returns TestLog. Test log in this time.

it('tests log', () => {
  const log = TestTime.getLog()
})

TestTime#lastId

Type: number.

Last used number in log’s nodeId.

TestTime#nextLog(opts?)

Return next test log in same time.

PropertyTypeDescription
optsobject?Log options.

Returns TestLog. Test log in this time.

it('tests 2 logs', () => {
  const time = new TestTime()
  const log1 = time.nextLog()
  const log2 = time.nextLog()
})

Functions

eachStoreCheck(test)

Pass all common tests for Logux store to callback.

PropertyTypeDescription
testcreatorCallback to create tests in your test framework.
import { eachStoreCheck } from 'logux-core'

eachStoreCheck((desc, creator) => {
  it(desc, creator(() => new CustomStore()))
})

isFirstOlder(firstMeta, secondMeta)

Compare time, when log entries were created.

It uses meta.time and meta.id to detect entries order.

PropertyTypeDescription
firstMetaMetaSome action’s metadata.
secondMetaMetaOther action’s metadata.

Returns boolean. Is first action is older than second.

import { isFirstOlder } from 'logux-core'
if (isFirstOlder(lastBeep, meta) {
  beep(action)
  lastBeep = meta
}

Callbacks

authCallback

PropertyTypeDescription
credentialsobjectRemote node credentials.
nodeIdstringUnique ID of remote node instance.

Returns Promise<boolean>. Promise with boolean value.

authenticator

PropertyTypeDescription
clientServerClientClient object.
credentialsanyThe client credentials.
userIdstringUser ID.

Returns boolean | Promise<boolean>. true if credentials was correct

authorizer

PropertyTypeDescription
actionActionThe action data.
ctxContextInformation about node, who create this action.
metaMetaThe action metadata.

Returns boolean | Promise<boolean>. true if client are allowed to use this action.

channelAuthorizer

PropertyTypeDescription
actionActionThe action data.
ctxChannelContextInformation about node, who create this action.
metaMetaThe action metadata.

Returns boolean | Promise<boolean>. true if client are allowed to subscribe to this channel.

channelFilter

PropertyTypeDescription
actionActionThe action data.
ctxContextInformation about node, who create this action.
metaMetaThe action metadata.

Returns boolean. Should action be sent to client.

creator

PropertyTypeDescription
generatorcreatorThe test creator.
namestringThe test name.

errorListener

PropertyTypeDescription
errorstringThe error description.

filter

PropertyTypeDescription
actionActionNew action from log.
metaMetaNew action metadata.

Returns Promise<boolean>. Promise with true if action should be synchronized with remote log.

filterCreator

PropertyTypeDescription
actionActionThe action data.
ctxChannelContextInformation about node, who create this action.
metaMetaThe action metadata.

Returns channelFilter | undefined. Actions filter.

generator

PropertyTypeDescription
storeStoreThe store instance.

Returns function. The test function to be used in test framework.

initialized

PropertyTypeDescription
actionActionThe action data.
ctxChannelContextInformation about node, who create this action.
metaMetaThe action metadata.

Returns Promise | undefined. Promise during initial actions loading.

iterator

PropertyTypeDescription
actionActionNext action.
metaMetaNext action’s metadata.

Returns boolean. returning false will stop iteration.

listener

PropertyTypeDescription
actionActionNew action.
metaMetaThe action’s metadata.

mapper

PropertyTypeDescription
actionActionNew action from log.
metaMetaNew action metadata.

Returns Promise<Entry>. Promise with array of changed action and changed metadata.

next

Returns Promise<Page>. Promise with next Page.

processor

PropertyTypeDescription
actionActionThe action data.
ctxContextInformation about node, who create this action.
metaMetaThe action metadata.

Returns Promise | undefined. Promise when processing will be finished.

resender

PropertyTypeDescription
actionActionThe action data.
ctxContextInformation about node, who create this action.
metaMetaThe action metadata.

Returns object | Promise<object>. Meta’s keys.


Types

Action

Type: object.

Action from the log.

PropertyTypeDescription
typestringAction type name.
{ type: 'add', id: 'project:12:price' value: 12 }

Entry

Type: Array.

Array with Action and its Meta.

PropertyTypeDescription
0ActionAction’s object.
1MetaAction’s metadata.

LastSynced

Type: object.

The added values for latest synchronized received/sent events.

PropertyTypeDescription
receivedstringThe added value of latest received event.
sentstringThe added value of latest sent event.

Message

Type: Array.

Logux protocol message. It is a array with string of message type in first position and strings/numbers/objects or arrays in next positions.

PropertyTypeDescription
0stringMessage type

Meta

Type: object.

Action’s metadata.

PropertyTypeDescription
addednumberSequence number of action in current log. Log#add will fill it.
idstringAction unique ID. Log#add set it automatically.
keepLaststring?Set code as reason and remove this reasons from previous actions.
reasonsstring[]?Why action should be kept in log. Action without reasons will be removed.
timenumber?Action created time. Milliseconds since UNIX epoch.

Nope

Type: Array.

If entry was not found Log return [null, null].

PropertyTypeDescription
0null
1null

Page

Type: object.

Part of log from Store.

PropertyTypeDescription
entriesEntry[]Pagination page.

Constants

ALLOWED_META

Type: string[].

List of meta keys permitted for clients.

const { ALLOWED_META } = require('@logux/server')
async function outMap (action, meta) {
  const filtered = { }
  for (const i in meta) {
    if (ALLOWED_META.includes(i)) {
      filtered[i] = meta[i]
    }
  }
  return [action, filtered]
}