Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming ndjson input and streaming flattened ndjson output (#53) #69

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions jello/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import traceback
from textwrap import TextWrapper
import jello
from jello.lib import opts, load_json, read_file, pyquery, Schema, Json
from jello.lib import opts, load_json, read_file, pyquery, format_response


def ctrlc(signum, frame):
Expand All @@ -35,6 +35,7 @@ def print_help():
-C force color output even when using pipes (overrides -m)
-e empty data (don't process data from STDIN or file)
-f load input data from JSON file or JSON Lines files
-F flatten output list/iterator to newline-delimited json
-i initialize environment with .jelloconf.py
located at ~ (linux) or %appdata% (Windows)
-l output as lines suitable for assignment to a bash array
Expand Down Expand Up @@ -189,6 +190,7 @@ def main(data=None, query='_'):
opts.lines = opts.lines or 'l' in options
opts.empty = opts.empty or 'e' in options
opts.force_color = opts.force_color or 'C' in options
opts.flatten = opts.flatten or 'F' in options
opts.mono = opts.mono or ('m' in options or bool(os.getenv('NO_COLOR')))
opts.nulls = opts.nulls or 'n' in options
opts.raw = opts.raw or 'r' in options
Expand Down Expand Up @@ -240,26 +242,8 @@ def main(data=None, query='_'):
opts.mono = False

# Create and print schema or JSON/JSON-Lines/Lines
output = ''
try:
if opts.schema:
schema = Schema()
output = schema.create_schema(response)

if not opts.mono and (sys.stdout.isatty() or opts.force_color):
schema.set_colors()
output = schema.color_output(output)

else:
json_out = Json()
output = json_out.create_json(response)

if (not opts.mono and not opts.raw) and (sys.stdout.isatty() or opts.force_color):
json_out.set_colors()
output = json_out.color_output(output)

print(output)

format_response(response)
except Exception as e:
print_exception(e, data, query, response, ex_type='Output')

Expand Down
167 changes: 145 additions & 22 deletions jello/lib.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""jello - query JSON at the command line with python syntax"""

import collections.abc
import os
import sys
import types
Expand Down Expand Up @@ -57,6 +58,7 @@ class opts:
keyword_color = None
number_color = None
string_color = None
flatten = None


class JelloTheme:
Expand Down Expand Up @@ -347,6 +349,46 @@ def create_json(self, data):
# this code should not run, but just in case something slips by above
raise TypeError(f'Object is not JSON serializable')


def format_response(response):
"""Create schema or JSON/JSON-Lines/Lines"""

if opts.flatten:
it = None
if isinstance(response, collections.abc.Iterator):
it = response
elif isinstance(response, list):
it = iter(response)
else:
raise TypeError('-F/flatten requires the query to return an iterator/generator or list')
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just output the object or scalar here instead?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no technical reason this can't be done. In my opinion it suggests an error: the user requested flattening the output and then returned a value that can't be flattened.

There are some surprising results.
"jello -F" whose query returns 1 would output

1

"jello -F" whose query returns [1] would flatten and similarly output

1

In my mind this is a question of what is most intuitive and least likely to confuse or produce an unexpected result. For example, I considered letting the user return an iterable in addition to an iterator (so anything providing iter() to produce an iterator) but this would lead to confusing situations such as returning the string, whose iterator provides each distinct character (returning "asdf" provides lines with each of "a", "s", "d", and "f").

Similarly, I want to avoid the user trying to stream but accidentally materializing the entire result in memory (they use -S/-F and then start swapping or oom). To that end maybe allowing the query to return a list is a bad idea and it should always be an iterator (including a generator).

So it's really about what we want to prioritize: try to allow the greatest breadth of return types to just work, or making things as explicit as possible and avoiding expected pitfalls at the expense of a bit more apparent complexity and perhaps being a little less intuitive.

This is also where doing something like #67 splits the difference -- it changes the semantics so that the query is called multiple times, always receiving and returning scalars. But that shift away from a single call with the entire input being in "_" and the returned value containing the entire output will also confuse.

I'm going back and forth while writing this (which I think you are too). My opinion at this point is that if it's streaming then the output must be an iterator, including a generator. Remove even a list. It'll be tedious for cases where the user wants to return a single value but can be made explicit with a good error message including the recommendation to yield this one value.

But I'm not convinced. I just want to weight the implication before introducing many options with complex semantics.

for item in it:
_format_single_response(item)
else:
_format_single_response(response)


def _format_single_response(response):
if isinstance(response, collections.abc.Iterator):
response = list(response)

if opts.schema:
schema = Schema()
output = schema.create_schema(response)

if not opts.mono and (sys.stdout.isatty() or opts.force_color):
schema.set_colors()
output = schema.color_output(output)

else:
json_out = Json()
output = json_out.create_json(response)

if (not opts.mono and not opts.raw) and (sys.stdout.isatty() or opts.force_color):
json_out.set_colors()
output = json_out.color_output(output)

print(output)


def load_json(data):
try:
Expand Down Expand Up @@ -397,22 +439,63 @@ def read_file(file_path):
with open(file_path, 'r') as f:
return f.read()

def pyquery(data, query, add_to_scope=None):
"""Sets options and runs the user's query."""
output = None

# read data into '_' variable
# if data is a list of dictionaries, then need to iterate through and convert all dictionaries to DotMap
if isinstance(data, list):
_ = [DotMap(i, _dynamic=False, _prevent_method_masking=True) if isinstance(i, dict)
else i for i in data]
def _compile_query(query):
"""
Compile the provided python code block into a function to transform json.

elif isinstance(data, dict):
_ = DotMap(data, _dynamic=False, _prevent_method_masking=True)
Wrapping in a function allows the block to yield/yield from.

else:
_ = data
The last statement, if an expression, will be converted into a return
statement. If the function does not return a generator this value will be
used to serialize json. If the function does return a generator, and so
this value will be the "value" of the StopIteration exception, the value
will later be discarded.

Returns the compiled AST with the function named "_jello_function". Any
free variables must be supplied by placing in "globals" when calling exec.
Once execed the function may be retrieved from the exec'ed "globals" and
then called.
"""

obj = ast.parse(query)
body = obj.body
if len(body) < 1:
raise ValueError('No query found.')
last_statement = body[-1]
if isinstance(last_statement, ast.Expr):
expression = last_statement.value
return_expr = ast.Return(
value=expression,
lineno=last_statement.lineno,
col_offset=last_statement.col_offset)
body[-1] = return_expr

function_def = ast.FunctionDef(
name="_jello_function",
args=ast.arguments(
posonlyargs=[],
args=[],
kwonlyargs=[],
kw_defaults=[],
defaults=[]),
body=body,
decorator_list=[],
returns=None,
type_comment=None,
lineno=0,
col_offset=0
)
m = ast.Module(
body=(
[function_def]
),
type_ignores=[]
)
return compile(source=m, filename="<string>", mode="exec")


def _inialize_config_and_options(_, add_to_scope):
# read initialization file to set colors, options, and user-defined functions
jelloconf = ''
conf_file = ''
Expand Down Expand Up @@ -478,23 +561,26 @@ def pyquery(data, query, add_to_scope=None):
scope.update(jcnf_dict)
if add_to_scope is not None:
scope.update(add_to_scope)
return jcnf_dict

# run the query
block = ast.parse(query, mode='exec')

if len(block.body) < 1:
raise ValueError('No query found.')
def _convert_output(output):
if not isinstance(output, collections.abc.Iterator):
return _convert_single_output(output)

last = ast.Expression(block.body.pop().value) # assumes last node is an expression
exec(compile(block, '<string>', mode='exec'), scope)
output = eval(compile(last, '<string>', mode='eval'), scope)
def convert_lazily():
for item in output:
yield _convert_single_output(item)
return convert_lazily()


def _convert_single_output(output):
# convert output back to normal dict
if isinstance(output, list):
output = [i.toDict() if isinstance(i, DotMap) else i for i in output]
return [i.toDict() if isinstance(i, DotMap) else i for i in output]

elif isinstance(output, DotMap):
output = output.toDict()
if isinstance(output, DotMap):
return output.toDict()

# if DotMap returns a bound function then we know it was a reserved attribute name
if hasattr(output, '__self__'):
Expand All @@ -503,5 +589,42 @@ def pyquery(data, query, add_to_scope=None):
return output


def pyquery(data, query, add_to_scope=None):
"""Sets options and runs the user's query."""
output = None

# read data into '_' variable
# if data is a list of dictionaries, then need to iterate through and convert all dictionaries to DotMap
if isinstance(data, list):
_ = [DotMap(i, _dynamic=False, _prevent_method_masking=True) if isinstance(i, dict)
else i for i in data]

elif isinstance(data, dict):
_ = DotMap(data, _dynamic=False, _prevent_method_masking=True)

elif isinstance(data, collections.abc.Iterator):
_ = (DotMap(i, _dynamic=False, _prevent_method_masking=True) if isinstance(i, dict)
else i for i in data)

else:
_ = data

jcnf_dict = _inialize_config_and_options(_, add_to_scope)

# add any functions in initialization file to the scope
scope = {'_': _, 'os': os}
scope.update(jcnf_dict)

# run the query
compiled = _compile_query(query)
exec(compiled, scope)
func = scope['_jello_function']

output = func()
output = _convert_output(output)

return output


if __name__ == '__main__':
pass
Loading