• Stars
    star
    121
  • Rank 293,924 (Top 6 %)
  • Language
    Python
  • License
    MIT License
  • Created over 4 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Simple streaming JSON parser and encoder.

json-stream

Tests PyPI package and version badge Supported Python versions badge Donate

Simple streaming JSON parser and encoder.

When reading JSON data, json-stream can decode JSON data in a streaming manner, providing a pythonic dict/list-like interface, or a visitor-based interfeace. Can stream from files, URLs or iterators.

When writing JSON data, json-stream can stream JSON objects as you generate them.

These techniques allow you to reduce memory consumption and latency.

Reading

json-stream is a JSON parser just like the standard library's json.load(). It will read a JSON document and convert it into native python types.

import json_stream
data = json_stream.load(f)

Features:

  • stream all JSON data types (objects, lists and simple types)
  • stream nested data
  • simple pythonic list-like/dict-like interface
  • stream truncated or malformed JSON data (up to the first error)
  • native code parsing speedups for most common platforms
  • pure python fallback if native extensions not available

Unlike json.load(), json-stream can stream JSON data from any file-like or iterable object. This has the following benefits:

  • it does not require the whole json document to be read into memory up-front
  • it can start producing data before the entire document has finished loading
  • it only requires enough memory to hold the data currently being parsed

There are specific integrations for streaming JSON data from URLs using the requests, httpx, or urllib.

The objects that json-stream produces can be re-output using json.dump() with a little work.

Usage

json_stream.load()

json_stream.load() has two modes of operation, controlled by the persistent argument (default false).

It is also possible to "mix" the modes as you consume the data.

Transient mode (default)

This mode is appropriate if you can consume the data iteratively. You cannot move backwards through the stream to read data that has already been skipped over. It is the mode you must use if you want to process large amounts of JSON data without consuming large amounts of memory.

In transient mode, only the data currently being read is stored in memory. Any data previously read from the stream is discarded (it's up to you what to do with it) and attempting to access this data results in a TransientAccessException.

import json_stream

# JSON: {"count": 3, "results": ["a", "b", "c"]}
data = json_stream.load(f)  # data is a transient dict-like object 
# stream has been read up to "{"

# use data like a dict
results = data["results"]  # results is a transient list-like object
# stream has been read up to "[", we now cannot read "count"

# iterate transient list
for result in results:
    print(result)  # prints a, b, c
# stream has been read up to "]"

# attempt to read "count" from earlier in stream
count = data["count"]  # will raise exception
# stream is now exhausted

# attempt to read from list that has already been iterated
for result in results:  # will raise exception
    pass

Persistent mode

In persistent mode all previously read data is stored in memory as it is parsed. The returned dict-like or list-like objects can be used just like normal data structures.

If you request an index or key that has already been read from the stream then it is retrieved from memory. If you request an index or key that has not yet been read from the stream, then the request blocks until that item is found in the stream.

import json_stream

# JSON: {"count": 1, "results": ["a", "b", "c"]}
data = json_stream.load(f, persistent=True)
# data is a streaming  dict-like object 
# stream has been read up to "{"

# use data like a dict
results = data["results"]  # results is a streaming list-like object
# stream has been read up to "["
# count has been stored data

# use results like a list
a_result = results[1]  # a_result = "b"
# stream has been read up to the middle of list
# "a" and "b" have been stored in results

# read earlier data from memory
count = data["count"]  # count = 1

# consume rest of list
results.read_all()
# stream has been read up to "}"
# "c" is now stored in results too
# results.is_streaming() == False

# consume everything
data.read_all()
# stream is now exhausted
# data.is_streaming() == False

Persistent mode is not appropriate if you care about memory consumption, but provides an identical experience compared to json.load().

Mixed mode

In some cases you will need to be able to randomly access some part of the data, but still only have that specific data taking up memory resources.

For example, you might have a very long list of objects, but you cannot always access the keys of the objects in stream order. You want to be able to iterate the list transiently, but access the result objects persistently.

This can be achieved using the persistent() method of all the list or dict-like objects json_stream produces. Calling persistent() causes the existing transient object to produce persistent child objects.

Note that the persistent() method makes the children of the object it is called on persistent, not the object it is called on.

import json_stream

# JSON: {"results": [{"x": 1, "y": 3}, {"y": 4, "x": 2}]}
# note that the keys of the inner objects are not ordered 
data = json_stream.load(f)  # data is a transient dict-like object 

# iterate transient list, but produce persistent items
for result in data['results'].persistent():
    # result is a persistent dict-like object
    print(result['x'])  # print x
    print(result['y'])  # print y (error on second result without .persistent())
    print(result['x'])  # print x again (error without .persistent())

The opposite is also possible, going from persistent mode to transient mode, though the use cases for this are more esoteric.

# JSON: {"a": 1, "x": ["long", "list", "I", "don't", "want", "in", "memory"], "b": 2}
data = load(StringIO(json), persistent=True).transient()
# data is a persistent dict-list object that produces transient children

print(data["a"])  # prints 1
x = data["x"]  # x is a transient list, you can use it accordingly
print(x[0])  # prints long

# access earlier data from memory
print(data["a"])  # this would have raised an exception if data was transient

print(data["b"])  # prints 2

# we have now moved past all the data in the transient list
print(x[0])  # will raise exception

visitor pattern

You can also parse using a visitor-style approach where a function you supply is called for each data item as it is parsed (depth-first).

This uses a transient parser under the hood, so does not consume memory for the whole document.

import json_stream

# JSON: {"x": 1, "y": {}, "xxxx": [1,2, {"yyyy": 1}, "z", 1, []]}

def visitor(item, path):
    print(f"{item} at path {path}")

json_stream.visit(f, visitor)

Output:

1 at path ('x',)
{} at path ('y',)
1 at path ('xxxx', 0)
2 at path ('xxxx', 1)
1 at path ('xxxx', 2, 'yyyy')
z at path ('xxxx', 3)
1 at path ('xxxx', 4)
[] at path ('xxxx', 5)

Stream a URL

json_stream knows how to stream directly from a URL using a variety of packages. Supported packages include:

  • Python's batteries-included urllib package
  • The popular requests library
  • The newer httpx library

urllib

urllib's response objects are already file-like objects, so we can just pass them directly to json-stream.

import urllib.request
import json_stream

with urllib.request.urlopen('http://example.com/data.json') as response:
    data = json_stream.load(response)

requests

To stream JSON data from requests, you must pass stream=True when making a request, and call json_stream.requests.load() passing the response.

import requests
import json_stream.requests

with requests.get('http://example.com/data.json', stream=True) as response:
    data = json_stream.requests.load(response)

Note: these functions use response.iter_content() under the hood with a chunk_size of 10k bytes. This default allows us to perform effective reads from the response stream and lower CPU usage. The drawback to this is that requests will buffer each read until up to 10k bytes have been read before passing the data back to json_stream. If you need to consume data more responsively the only option is to tune chunk_size back to 1 to disable buffering.

httpx

To stream JSON data from httpx, you must call stream() when making your request, and call json_stream.httpx.load() passing the response.

import httpx
import json_stream.httpx

with httpx.Client() as client, client.stream('GET', 'http://example.com/data.json') as response:
    data = json_stream.httpx.load(response)

Under the hood, this works similarly to the requests version above, including the caveat about chunk_size.

Stream a URL (with visitor)

The visitor pattern also works with URL streams.

urllib

import urllib.request
import json_stream

def visitor(item, path):
    print(f"{item} at path {path}")
    
with urllib.request.urlopen('http://example.com/data.json') as response:
    json_stream.visit(response, visitor)

requests

import requests
import json_stream.requests

def visitor(item, path):
    print(f"{item} at path {path}")
    
with requests.get('http://example.com/data.json', stream=True) as response:
    json_stream.requests.visit(response, visitor)

The chunk_size note also applies to visit().

httpx

import httpx
import json_stream.httpx

def visitor(item, path):
    print(f"{item} at path {path}")
    
with httpx.Client() as client, client.stream('GET', 'http://example.com/data.json') as response:
    json_stream.httpx.visit(response, visitor)

Stream an iterable

json-stream's parsing functions can take any iterable object that produces encoded JSON as byte objects.

import json_stream

def some_iterator():
    yield b'{"some":'
    yield b' "JSON"}'

data = json_stream.load(some_iterator())
assert data['some'] == "JSON"

This is actually how the requests and httpx extensions work, as both libraries provide methods to iterate over the response content.

Encoding json-stream objects

You can re-output (encode) persistent json-stream dict-like and list-like object back to JSON using the built-in json.dump() or json.dumps() functions, but with a little additional work:

import json

import json_stream
from json_stream.dump import JSONStreamEncoder, default

data = json_stream.load(f, persistent=True)

# Option 1: supply json_stream.encoding.default as the default argument
print(json.dumps(data, default=default))

# Option 2: supply json_stream.encoding.JSONStreamEncoder as the cls argument
# This allows you to create your own subclass to further customise encoding
print(json.dumps(data, cls=JSONStreamEncoder))

If you are using a library that internally takes data you pass it and encodes it using json.dump(). You can also use JSONStreamEncoder() as a context manager.

It works by monkey-patching the built-in JSONEncoder.default method during the scope of the with statement.

# library code
def some_library_function_out_of_your_control(arg):
    json.dumps(arg)

# your code
with JSONStreamEncoder():
    some_library_function_out_of_your_control(data)

Converting to standard Python types

To convert a json-stream dict-like or list-like object and all its descendants to a standard list and dict, you can use the json_stream.to_standard_types utility:

# JSON: {"round": 1, "results": [1, 2, 3]}
data = json_stream.load(f)
results = data["results"]
print(results)  # prints <TransientStreamingJSONList: TRANSIENT, STREAMING>
converted = json_stream.to_standard_types(results)
print(converted)  # prints [1, 2, 3]

Thread safety (experimental)

There is also a thread-safe version of the json.dump context manager:

from json_stream.dump.threading import ThreadSafeJSONStreamEncoder

# your code
with ThreadSafeJSONStreamEncoder():
   some_library_function_out_of_your_control(data)

The thread-safe implementation will ensure that concurrent uses of the context manager will only apply the patch for the first thread entering the patched section(s) and will only remove the patch when the last thread exits the patched sections(s)

Additionally, if the patch is somehow called by a thread that is not currently in a patched section (i.e. some other thread calling json.dump) then that thread will block until the patch has been removed. While such an un-patched thread is active, any thread attempting to apply the patch is blocked.

Rust tokenizer speedups

By default json-stream uses the json-stream-rs-tokenizer native extension.

This is a 3rd party Rust-based tokenizer implementations that provides significant parsing speedup compared to pure python implementation.

json-stream will fallback to its pure python tokenizer implementation if json-stream-rs-tokenizer is not available.

Custom tokenizer

You can supply an alternative JSON tokenizer implementation. Simply pass a tokenizer to the load() or visit() methods.

json_stream.load(f, tokenizer=some_tokenizer)

The requests methods also accept a customer tokenizer parameter.

Writing

The standard library's json.dump() function can only accept standard python types such as dict, list, str.

json-stream allows you to write streaming JSON output based on python generators instead.

For actually encoding and writing to the stream, json-stream still uses the standard library's json.dump() function, but provides wrappers that adapt python generators into dict/list subclasses that json.dump() can use.

The means that you do not have to generate all of your data upfront before calling json.dump().

Usage

To use json-stream to generate JSON data iteratively, you must first write python generators (or use any other iterable).

To output JSON objects, the iterable must yield key/value pairs.

To output JSON lists, the iterable must yield individual items.

The values yielded can be either be standard python types or another other Streamable object, allowing lists and object to be arbitrarily nested.

streamable_list/streamable_dict can be used to wrap an existing iterable:

import sys
import json

from json_stream import streamable_list

# wrap existing iterable
data = streamable_list(range(10))

# consume iterable with standard json.dump()
json.dump(data, sys.stdout)

Or they can be used as decorators on generator functions:

import json
import sys

from json_stream import streamable_dict

# declare a new streamable dict generator function
@streamable_dict
def generate_dict_of_squares(n):
    for i in range(n):
        # this could be some memory intensive operation
        # or just a really large value of n
        yield i, i ** 2

# data is will already be Streamable because
# of the decorator
data = generate_dict_of_squares(10)
json.dump(data, sys.stdout)

Example

The following example generates a JSON object with a nested JSON list. It uses time.sleep() to slow down the generation and show that the output is indeed written as the data is created.

import sys
import json
import time

from json_stream.writer import streamable_dict, streamable_list


# define a list data generator that (slowly) yields 
# items that will be written as a JSON list
@streamable_list
def generate_list(n):
    # output n numbers and their squares
    for i in range(n):  # range is itself a generator
        yield i
        time.sleep(1)


# define a dictionary data generator that (slowly) yields 
# key/value pairs that will be written as a JSON dict
@streamable_dict
def generate_dict(n):
    # output n numbers and their squares
    for i in range(n):  # range is itself a generator
        yield i, i ** 2
        time.sleep(1)

    # yield another dictionary item key, with the value
    # being a streamed nested list
    yield "a list", generate_list(n)


# get a streamable generator
data = generate_dict(5)

# use json.dump() to write dict generator to stdout
json.dump(data, sys.stdout, indent=2)

# if you already have an iterable object, you can just
# call streamable_* on it to make it writable
data = streamable_list(range(10))
json.dump(data, sys.stdout)

Output:

{
  "0": 0,
  "1": 1,
  "2": 4,
  "3": 9,
  "4": 16,
  "a list": [
    0,
    1,
    2,
    3,
    4
  ]
}

What are the problems with the standard json package?

Reading with json.load()

The problem with the json.load() stem from the fact that it must read the whole JSON document into memory before parsing it.

Memory usage

json.load() first reads the whole document into memory as a string. It then starts parsing that string and converting the whole document into python types again stored in memory. For a very large document, this could be more memory than you have available to your system.

json_stream.load() does not read the whole document into memory, it only buffers enough from the stream to produce the next item of data.

Additionally, in the default transient mode (see below) json-stream doesn't store up all of the parsed data in memory.

Latency

json.load() produces all the data after parsing the whole document. If you only care about the first 10 items in a list of 2 million items, then you have wait until all 2 million items have been parsed first.

json_stream.load() produces data as soon as it is available in the stream.

Writing

Memory usage

While json.dump() does iteratively write JSON data to the given file-like object, you must first produce the entire document to be written as standard python types (dict, list, etc). For a very large document, this could be more memory than you have available to your system.

json-stream allows you iteratively generate your data one item at a time, and thus consumes only the memory required to generate that one item.

Latency

json.dump() can only start writing to the output file once all the data has been generated up front at standard python types.

The iterative generation of JSON items provided by json-stream allows the data to be written as it is produced.

Future improvements

  • Allow long strings in the JSON to be read as streams themselves
  • Allow transient mode on seekable streams to seek to data earlier in the stream instead of raising a TransientAccessException
  • A more efficient tokenizer?

Alternatives

NAYA

NAYA is a pure python JSON parser for parsing a simple JSON list as a stream.

Why not NAYA?

  • It can only stream JSON containing a top-level list
  • It does not provide a pythonic dict/list-like interface

Yajl-Py

Yajl-Py is a wrapper around the C YAJL JSON library that can be used to generate SAX style events while parsing JSON.

Why not Yajl-Py?

  • No pure python implementation
  • It does not provide a pythonic dict/list-like interface

jsonslicer

jsonslicer is another wrapper around the YAJL C library with a path lookup based interface.

Why not jsonslicer?

  • No pure python implementation
  • It does not provide a pythonic dict/list-like interface
  • Must know all data paths lookup in advance (or make multiple passes)

Contributing

See the project contribution guide.

Donations

PayPal

OR

"Buy Me A Coffee"

Acknowledgements

The JSON tokenizer used in the project was taken from the NAYA project.