Asyncing Feeling about JavaScript Generators

This post first appeared on the Big Nerd Ranch blog.

Want the TL;DR version? Here’s a gist of all three examples.

Async generators and async iteration have arrived! Err, they’ve reached Stage 3, which means they are likely to ship in a future version of JavaScript. Until then, you can enable Stage 3 proposals in Babel to try them out in your own projects.

The web is essentially a decentralized app runtime, so subpar language additions have permanent consequences since future standards must remain backwards compatible. So for a feature to be accepted into the ECMAScript standard, it has to be incredibly compelling—it takes more than snazzy syntax or theoretical elegance for a feature to make the cut.

With that in mind, we should expect async generators and iteration to substantially influence how we architect our future code, yet address a contemporary problem. Let’s investigate how async generators work and examine the challenges of using them in “real” codebases.

Recap: How Async Generators Work

In a nutshell, async generators are like regular generator functions, but they yield Promises. If you aren’t familiar with ES2015 generator functions, check out Chris Aquino’s blog, then watch Jafar Husain’s excellent talk on Async Programming.

To recap, regular generator functions are basically a cross between the Iterator and Observer patterns. A generator is a pausable function that you can “step” through by calling .next(). You can pull a value out of a generator multiple times with .next(), or push a value into the same function multiple times with .next(valueToPush). This dual interface allows you to imitate both an Iterator and Observer with the same syntax!

However, generators have a disadvantage: they must immediately (synchronously) return data when .next() is invoked. Put another way, the code that consumes the data by calling .next() is in control of data flow. This is fine when the generator can generate new data on demand, but generators are not a good fit for iterating over asynchronous (or temporal) data sources, where the source itself controls when the next chunk of data is available.

WebSocket messages are a good example of an asynchronous data source. If we had a list of all the messages we would ever receive, we could iterate over them synchronously. But of course, we can’t know when messages will be received, so we need a mechanism to iterate lazily over messages as they arrive. Async generators and async iteration let us do just that!

TL;DR: generator functions are for data sources where the data consumer is in control, whereas async generators allow the data source itself to be in control.

Simple Example: Generate and Consume an AsyncGenerator

Let’s exercise our async chops with an example. We want to write an async generator function that repeatedly generates a new number after waiting a random number of milliseconds. Over a period of several seconds it might generate five or so numbers starting from 0. Let’s first write a helper function that generates a Promise to represent a timer:

// Create a Promise that resolves after ms time
var timer = function(ms) {
  return new Promise(resolve => {
    setTimeout(resolve, ms);
  });
};

Calling timer(5000) returns a Promise that will resolve in 5 seconds. Now we’re ready to write an async generator:

// Repeatedly generate a number starting
// from 0 after a random amount of time
var source = async function*() {
  var i = 0;
  while (true) {
    await timer(Math.random() * 1000);
    yield i++;
  }
};

So much complexity hiding behind such elegance! Our async generator function waits a random amount of time, then yields the next number in the count-up. If we didn’t have async generators, we could try using a regular generator function to yield Promises like this:

var source = function*() {
  var i = 0;
  while (true) {
    yield timer(Math.random() * 1000)
      .then(() => i++);
  }
};

However, there are some edge cases and boilerplate we’d have to handle, so it’s nice to have a dedicated function type! Now we’re ready to write the consuming code; because we need the await operator, we’ll create an async run() function.

// Tie everything together
var run = async function() {
  var stream = source();
  for await (let n of stream) {
    console.log(n);
  }
};

run();
// => 0
// => 1
// => 2
// => 3
// ...

What magic, and in under 20 lines of code! First, we invoke the source async generator function, which returns a special AsyncGenerator object. Then we use the for await...of loop syntax—called “asynchronous iteration”—to loop over numbers one-by-one as source generates them.

But we can level up: suppose we want to square the numbers generated by source. We could square directly inside the for await...of loop, but it’d be better to “transform” the stream of values outside the loop, similar to using .map() to transform an array of values. It’s quite straightforward:

// Return a new async iterator that applies a
// transform to the values from another async generator
var map = async function*(stream, transform) {
  for await (let n of stream) {
    yield transform(n);
  }
};

Then we just need to add a line to the run() function:

 // Tie everything together
 var run = async function() {
   var stream = source();
+  // Square values generated by source() as they arrive
+  stream = map(stream, n => n * n);
   for await (let n of stream) {
     console.log(n);
   }
 };

Now when we run() everything:

// => 0
// => 1
// => 4
// => 9
// ...

Impressive! But perhaps generating counting numbers isn’t especially innovative.

Medium Example: Write an AsyncIterator for WebSockets

The usual way to respond to incoming WebSocket messages is to attach an event listener:

var ws = new WebSocket('ws://localhost:3000/');
ws.addEventListener('message', event => {
  console.log(event.data);
});

But if we treated WebSocket messages as a stream, it seems natural to “iterate” over messages as they arrive. Unfortunately, WebSockets are not yet async iterable, but we can write our own polyfill in just a few lines. Here’s what our run() function will look like:

// Tie everything together
var run = async () => {
  var ws = new WebSocket('ws://localhost:3000/');
  for await (let message of ws) {
    console.log(message);
  }
};

Now for that polyfill. You may recall from Chris Aquino’s blog series that, for an object to be iterable with the for...of loop, you must define the Symbol.iterator property on that object. Similarly, to make an object async iterable with the for await...of loop, its Symbol.asyncIterator property must be defined. Here’s an implementation:

// Add an async iterator to all WebSockets
WebSocket.prototype[Symbol.asyncIterator] = async function*() {
  while(this.readyState !== 3) {
    yield (await oncePromise(this, 'message')).data;
  }
};

This async iterator waits to receive a message, then yields the data attribute of the WebSocket’s MessageEvent. The oncePromise() function is a bit of a hack: it returns a Promise that resolves when an event occurs, then immediately unsubscribes:

// Generate a Promise that listens only once for an event
var oncePromise = (emitter, event) => {
  return new Promise(resolve => {
    var handler = (...args) => {
      emitter.removeEventListener(event, handler);
      resolve(...args);
    };
    emitter.addEventListener(event, handler);
  });
};

It seems inefficient, but it really tidies up our async iterator. If you have a chatty WebSocket server running at http://localhost:3000, you can watch messages stream in by invoking run():

run();
// => "hello"
// => "sandwich"
// => "otters"
// ...

Hard Example: Rewrite RxJS

Now for the ultimate challenge. Functional reactive programming (FRP) is all the rage in UI programming, and in JavaScript, RxJS is the most popular library for this programming style. RxJS models event sources as Observables—they’re like an event stream or lazy array that can be modified with familiar array idioms like map() and filter().

Since FRP complements JavaScript’s non-blocking philosophy, it’s possible an RxJS-like API will make it to a future version of JavaScript. Meantime, we can write our own RxJS clone with async generators in just 80 lines of code! Here’s the challenge:

  1. Listen for all click events
  2. Filter down to only clicks on anchor tags
  3. Only allow distinct clicks
  4. Map from click events to a click counter and the click event
  5. Throttle clicks to once every 500ms
  6. Print the click counter and event

This type of problem is right in RxJS’s wheelhouse, so we’ll try to replicate its approach. Here’s how we’ll exercise our implementation:

// Tie everything together
var run = async () => {
  var i = 0;
  var clicks = streamify('click', document.querySelector('body'));

  clicks = filter(clicks, e => e.target.matches('a'));
  clicks = distinct(clicks, e => e.target);
  clicks = map(clicks, e => [i++, e]);
  clicks = throttle(clicks, 500);

  subscribe(clicks, ([ id, click ]) => {
    console.log(id);
    console.log(click);
    click.preventDefault();
  });
};

run();

To make this work, we need to write six functions: streamify(), filter(), distinct(), map(), throttle() and subscribe().

// Turn any event emitter into a stream
var streamify = async function*(event, element) {
  while (true) {
    yield await oncePromise(element, event);
  }
};

streamify() is just like the WebSocket async iterator: oncePromise() uses .addEventListener() to listen once for an event, then resolves the Promise. By looping with while (true), we can listen for events indefinitely.

// Only pass along events that meet a condition
var filter = async function*(stream, test) {
  for await (var event of stream) {
    if (test(event)) {
      yield event;
    }
  }
};

filter() only yields events that pass the test. map() is almost identical:

// Transform every event of the stream
var map = async function*(stream, transform) {
  for await (var event of stream) {
    yield transform(event);
  }
};

Instead of testing before yielding, map() simply transforms the event before yielding. distinct() shows one of the superpowers of async generators: they can persist state with local variables!

var identity = e => e;

// Only pass along events that differ from the last one
var distinct = async function*(stream, extract = identity) {
  var lastVal;
  var thisVal;
  for await (var event of stream) {
    thisVal = extract(event);
    if (thisVal !== lastVal) {
      lastVal = thisVal;
      yield event;
    }
  }
};

Last, the mighty throttle() function resembles distinct(): it tracks the timestamp of the last event and only yields it if a certain amount of time has passed since the last yielded event.

// Only pass along event if some time has passed since the last one
var throttle = async function*(stream, delay) {
  var lastTime;
  var thisTime;
  for await (var event of stream) {
    thisTime = (new Date()).getTime();
    if (!lastTime || thisTime - lastTime > delay) {
      lastTime = thisTime;
      yield event;
    }
  }
};

Finally, we need to print out the click event and counter for every event that made it this far. subscribe() is trivial: it just loops over every event and runs the callback, no yields necessary.

// Invoke a callback every time an event arrives
var subscribe = async (stream, callback) => {
  for await (var event of stream) {
    callback(event);
  }
};

And with that, we’ve written our own functional reactive pipeline!

Check out the gist if you want to try out any of these examples.

Challenges

Async generators are pretty awesome. Whereas generator functions allow us to pull data out of an iterator, async generators let us iterate over data that is “pushed” to us. They’re a great abstraction for asynchronous data structures. However, there are some caveats.

First, implementing support for the for await...of on objects is a bit gnarly unless you avoid yield and await. Notably, converting anything with .addEventListener() is tricky because you can’t use the yield operator within the callback:

var streamify = async function*(event, element) {
  element.addEventListener(event, e => {
    // This doesn't work because yield is being
    // called from inside another function.
    yield e;
  });
};

Similarly, you can’t use yield within .forEach() or other functional methods. This is an inherent limitation since there’s no guarantee yield won’t be used after the generator has already finished.

To sidestep this, we wrote the oncePromise() helper. Apart from potential performance issues, it’s important to note that Promise callbacks always execute after the current callstack has finished. In browsers that run Promise callbacks as microtasks, this shouldn’t cause issues, but some Promise polyfills won’t run the callbacks until the next run of the event loop. Consequently, invoking the .preventDefault() method may have no effect since the DOM event may have already bubbled to the browser.

JavaScript now has several asynchronous stream datatypes: Stream, AsyncGenerator and eventually Observable. While all three fall into the continuum of “pushed” data sources, there are subtle semantic differences in how they handle back pressure and control the underlying resource. If you’re interested in the finer facets of functional reactive semantics, check out the General Theory of Reactivity.

More to Come

In the arms race for language features, JavaScript is no slacker. Destructuring in ES2015, async functions in ES2016, and now async iteration enable JavaScript to elegantly tackle the complexities of UI and I/O programming without resorting to the usual unpredictability of multi-threading.

And there’s much more to come! So keep an eye on the blog and the TC39 proposals repo for new goodies. Meantime, you can start using async generator functions in your own code by enabling Stage 3 proposals in Babel.

Jonathan Martin

Jonathan Martin

Globe-trotting web developer, instructor, international speaker and fine art landscape photographer from Atlanta, GA.

Read More