diff --git a/README.md b/README.md index b08c328..fd56e84 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ [![CodeFactor](https://www.codefactor.io/repository/github/junkybyte/vispipe/badge/master?s=b4f0ed72fedffa8ed8cbc9bc9887a0db528a24b2)](https://www.codefactor.io/repository/github/junkybyte/vispipe/overview/master) ![Package Testing](https://github.com/JunkyByte/vispipe/workflows/Package%20Testing/badge.svg?branch=master) +![Preview](docs/preview.png) + VisPipe is a python package that can help you build pipelines by providing a convenient visual creation tool that can simplify the construction and debugging of otherwise blackbox pipelines. Once you are satisfied with the result a pipeline can be saved to file and run via code, the outputs of it can be easily interacted with through python code. @@ -16,172 +18,5 @@ git clone https://github.com/JunkyByte/vispipe.git pip install vispipe ``` -## Usage -A bunch of examples of the different features of the package. -Refer to the documentation to get a better view of each function arguments. - -### Blocks -A block is a function tagged with the decorator called (guess what) `block`. -```python -from vispipe import block - -@block -def identity_block(x): - return x -``` - -Or a class with a `run` method. -```python -@block -class identity_class_block: - def __init__(self): - # you can have attributes stored and used by run method - self.useless_value = 42 - pass - - def run(self, x): - return x -``` - -A block can have multiple inputs. - -```python -@block -def add(x, y): - return x + y - -# Or none -@block -def just_a_one(): - return 1 -``` - -All the inputs we defined right now are 'real' inputs and will be filled -by connecting the block to other blocks of the pipeline. -We may want to have static arguments as well, an input will become a static argument once we assign a default value to it. If you want to use the visualization you should also specify the type so that they can be parsed correctly. - -```python -@block -def add_constant(x, k: int = 42): - # x will be a 'real' input while k will be a static argument - return x + k -``` - -Now that you know how blocks work let's see how to create a pipeline, add and connect them. - -### Pipeline Building - -```python -from vispipe import Pipeline - -# Create a pipeline -p = Pipeline() - -# Add nodes by name -ones = p.add_node('just_a_one') -adder = p.add_node('add_constant', k = 10) # We can specify the value of static arguments during add_node -# The add_node function will return the unique identifier for the node, it is an integer and -# corresponds to the hash of the node, you will use it to interact with the node. - -# Connect nodes -# syntax: add_conn(from_hash, output_index, to_hash, input_index) -p.add_conn(ones, 0, adder, 0) - -# We have a working pipeline now! Let's start it -p.run() -# We can wait for it to end using -p.join(timeout=1) # It supports timeout argument similarly to Threading std library - -# In this case the pipeline will run indefinitely and we have not way to interact with it. -# Let's Add an output to it -p.add_output(adder) - -# If we now run it we can iterate over the outputs of adder -p.run() -for x in p.outputs[adder]: - print(x) - ->>> 11 # It will add 10 that is our constant to 1 which is the just_a_one output. -# (...) ->>> 11 # It will run indefinitely as there's no ending. -``` - -### Saving and reloading - -```python -# Once we are happy with our pipeline we can save it to pickle -p.save(file_path) - -# And reload it in a later stage -p = Pipeline() -p.load(file_path) - -# Or more concisely -p = Pipeline(path=file_path) -``` - -### Advanced Block creation -#### `Pipeline._empty` and `Pipeline.skip` objecs -You may have noticed that the flexibility of the blocks we created is pretty limited, we need to return a value at each call and we will always receive an input. -To overcome this there are two particular objects that get treated in a particular way: - -`Pipeline._empty` allows to specify that we do not want to return any result yet. -```python -# The example is based on the benchmark block from ops/utils.py -@block -class benchmark: - def __init__(self): - self.n = 1000 - self.start_time = None - - def run(self, x): - if self.start_time is None: - self.start_time = time.time() - - self.n -= 1 - if self.n == -1: # After 1000 iterations we return delta time - delta = time.time() - self.start_time - return delta - - # (...) missing code to manage the ending - - return Pipeline._empty # Otherwise we are not ready to return an output -``` - -`Pipeline._skip(value)` allows to return a value while also skipping the next input. -This is particularly useful when you need to iterate over an input. -```python -# The example is based on iterator from ops/flows.py -@block -class iterator: - def __init__(self): - self.iterator = None - - def run(self, x): - if self.iterator is None: - self.iterator = iter(x) - - try: - # As we can still iterate we return a skip object containing the next value - y = Pipeline._skip(next(self.iterator)) - except StopIteration: - # If we finished the iterator we return an empty so that we can wait for next input - self.iterator = None - y = Pipeline._empty - - return y -``` - -### Macro Blocks -Macro blocks are a convenient way to speed up a set of linearly connected blocks. -Blocks that are part of a macro will be run together (instead of connected with queues). -While this limits the flexibility of a part of the pipeline the functions will run a lot faster as they completely -skip the communication overhead. -(Please refer to documentation for a better explanation of this functionality) - -```python -# (...) -p.add_macro(start_hash, end_hash) # Will add a macro from start hash to end hash. - -p.remove_macro(node_hash) # Will delete the macro the node belongs to. -``` +## [Usage](docs/docs_md/usage.md) +## [Visualization](docs/docs_md/visualization.md) diff --git a/docs/docs_md/usage.md b/docs/docs_md/usage.md new file mode 100644 index 0000000..6bcb880 --- /dev/null +++ b/docs/docs_md/usage.md @@ -0,0 +1,176 @@ +# Usage + +A bunch of examples of the different features of the package. +Refer to the documentation to get a better view of each function arguments. + +### Blocks +A block is a function tagged with the decorator called (guess what) `block`. +```python +from vispipe import block + +@block +def identity_block(x): + return x +``` + +Or a class with a `run` method. +```python +@block +class identity_class_block: + def __init__(self): + # you can have attributes stored and used by run method + self.useless_value = 42 + pass + + def run(self, x): + return x +``` + +A block can have multiple inputs. + +```python +@block +def add(x, y): + return x + y + +# Or none +@block +def just_a_one(): + return 1 +``` + +All the inputs we defined right now are 'real' inputs and will be filled +by connecting the block to other blocks of the pipeline. +We may want to have static arguments as well, an input will become a static argument once we assign a default value to it. If you want to use the visualization you should also specify the type so that they can be parsed correctly. + +```python +@block +def add_constant(x, k: int = 42): + # x will be a 'real' input while k will be a static argument + return x + k +``` + +Now that you know how blocks work let's see how to create a pipeline, add and connect them. + +### Pipeline Building + +```python +from vispipe import Pipeline + +# Create a pipeline +p = Pipeline() + +# Add nodes by name +ones = p.add_node('just_a_one') +adder = p.add_node('add_constant', k = 10) # We can specify the value of static arguments during add_node +# The add_node function will return the unique identifier for the node, it is an integer and +# corresponds to the hash of the node, you will use it to interact with the node. + +# Connect nodes +# syntax: add_conn(from_hash, output_index, to_hash, input_index) +p.add_conn(ones, 0, adder, 0) + +# We have a working pipeline now! Let's start it +p.run() +# We can wait for it to end using +p.join(timeout=1) # It supports timeout argument similarly to Threading std library + +# In this case the pipeline will run indefinitely and we have not way to interact with it. +# Let's Add an output to it +p.add_output(adder) + +# If we now run it we can iterate over the outputs of adder +p.run() +for x in p.outputs[adder]: + print(x) + +>>> 11 # It will add 10 that is our constant to 1 which is the just_a_one output. +# (...) +>>> 11 # It will run indefinitely as there's no ending. +``` + +### Saving and reloading + +```python +# Once we are happy with our pipeline we can save it to pickle +p.save(file_path) + +# And reload it in a later stage +p = Pipeline() +p.load(file_path) + +# Or more concisely +p = Pipeline(path=file_path) +``` + +### Advanced Block creation +#### `Pipeline._empty` and `Pipeline.skip` objecs +You may have noticed that the flexibility of the blocks we created is pretty limited, we need to return a value at each call and we will always receive an input. +To overcome this there are two particular objects that get treated in a particular way: + +`Pipeline._empty` allows to specify that we do not want to return any result yet. +```python +# The example is based on the benchmark block from ops/utils.py +@block +class benchmark: + def __init__(self): + self.n = 1000 + self.start_time = None + + def run(self, x): + if self.start_time is None: + self.start_time = time.time() + + self.n -= 1 + if self.n == -1: # After 1000 iterations we return delta time + delta = time.time() - self.start_time + return delta + + # (...) missing code to manage the ending + + return Pipeline._empty # Otherwise we are not ready to return an output +``` + +`Pipeline._skip(value)` allows to return a value while also skipping the next input. +This is particularly useful when you need to iterate over an input. +```python +# The example is based on iterator from ops/flows.py +@block +class iterator: + def __init__(self): + self.iterator = None + + def run(self, x): + if self.iterator is None: + self.iterator = iter(x) + + try: + # As we can still iterate we return a skip object containing the next value + y = Pipeline._skip(next(self.iterator)) + except StopIteration: + # If we finished the iterator we return an empty so that we can wait for next input + self.iterator = None + y = Pipeline._empty + + return y +``` + +### Macro Blocks +Macro blocks are a convenient way to speed up a set of linearly connected blocks. +Blocks that are part of a macro will be run together (instead of connected with queues). +While this limits the flexibility of a part of the pipeline the functions will run a lot faster as they completely +skip the communication overhead. +(Please refer to documentation for a better explanation of this functionality) + +```python +# (...) +p.add_macro(start_hash, end_hash) # Will add a macro from start hash to end hash. + +p.remove_macro(node_hash) # Will delete the macro the node belongs to. +``` + +### An example +Loads the mnist dataset from a numpy array + the labels associated. +It then reshape the images to be actual `(28, 28)`, resize them to another resolution +and creates batches from them. + diff --git a/docs/docs_md/visualization.md b/docs/docs_md/visualization.md new file mode 100644 index 0000000..d62a6b3 --- /dev/null +++ b/docs/docs_md/visualization.md @@ -0,0 +1,39 @@ +# Visualization + +The visualization can be useful to build pipelines in an easier way, to have your custom +blocks inside the visualization you should import the `.py` files and launch the server +from there: +```python +from vispipe import Server +import my_custom_blocks +import my_other_custom_blocks + +if __name__ == '__main__': + # slow=True is recommended and will run the pipeline slowly to allow visualization + # path is the checkpoint path you want to use + Server(path, slow=True) +``` + +Once the server is launched connect to `localhost:5000`. + +### Add Nodes +The nodes have a tag that can be specified during declaration, you will find these tags during visualization. +```python +@block(tag='my_custom_tag') +def f(): + # (...) +``` +To spawn a node simply click it on the right side menu. +You can create switch between tags using the right top side arrows. + +### Add Connections +![Missing gif](https://media.giphy.com/media/idSuAhb6Wa6rRvzIkT/giphy.gif) + +### Set custom arguments +![Missing gif](https://media.giphy.com/media/USEG1wMYmUL12AUD2C/giphy.gif) + +### Add outputs +![Missing gif](https://media.giphy.com/media/VdQsT8i1gj2CUMH07D/giphy.gif) + +### Add visualization + diff --git a/docs/preview.png b/docs/preview.png new file mode 100644 index 0000000..99d505b Binary files /dev/null and b/docs/preview.png differ diff --git a/run_server.py b/run_server.py index baa54c5..df16d5a 100644 --- a/run_server.py +++ b/run_server.py @@ -2,4 +2,4 @@ import usage if __name__ == '__main__': - Server(slow=True, use_curses=False) + Server(slow=True, use_curses=True) diff --git a/tests/data/mnist.npy b/tests/data/mnist.npy index d3a391a..f5017f2 100644 Binary files a/tests/data/mnist.npy and b/tests/data/mnist.npy differ diff --git a/usage.py b/usage.py index e307d5a..ad88503 100644 --- a/usage.py +++ b/usage.py @@ -34,26 +34,35 @@ def run(self, x): return -1 -#pipeline = Pipeline() - -#img1 = pipeline.add_node('random_image') -#img2 = pipeline.add_node('random_image') -#add = pipeline.add_node('test_addition') -#plus1 = pipeline.add_node('test_plus1') -#plus2 = pipeline.add_node('test_plus100') -#timern = pipeline.add_node('benchmark', n=100000, log=True) - -#pipeline.add_conn(img1, 0, add, 0) -#pipeline.add_conn(img2, 0, add, 1) -#pipeline.add_conn(add, 0, plus1, 0) -#pipeline.add_conn(plus1, 0, plus2, 0) -#pipeline.add_conn(plus2, 0, timern, 0) - -#pipeline.add_macro(add, timern) - -#pipeline.run(slow=False, use_mp=False) -#pipeline.join() - -#pipeline.clear_pipeline() -#pipeline.save('./scratch_test.pickle') -#pipeline.load('./test.pickle') +# Create pipeline +#p = Pipeline() + +# Create nodes +#load_images = p.add_node('np_iter_file', path='tests/data/mnist.npy') +#load_labels = p.add_node('np_iter_file', path='tests/data/mnist_labels.npy') +#reshape = p.add_node('np_reshape', shape=(28, 28)) +#resize = p.add_node('resize_cv2', width=56, height=56) +#batch_images = p.add_node('batchify/images', size=32) +#batch_labels = p.add_node('batchify/labels', size=32) + +# Add connections +#p.add_conn(load_images, 0, reshape, 0) +#p.add_conn(reshape, 0, resize, 0) +#p.add_conn(resize, 0, batch_images, 0) +#p.add_conn(load_labels, 0, batch_labels, 0) + +# Set outputs +#p.add_output(batch_images) +#p.add_output(batch_labels) + +# Run it +#p.run(slow=False, use_mp=False) +#for batch_x, batch_y in zip(p.outputs['images'], p.outputs['labels']): +# print(batch_x.shape, batch_y.shape) + +# Save it +#p.save('./scratch_test.pickle') + +#p.join() +#p.clear_pipeline() +#p.load('./test.pickle') diff --git a/vispipe/ops/flows.py b/vispipe/ops/flows.py index 3b0d5d6..652cf82 100644 --- a/vispipe/ops/flows.py +++ b/vispipe/ops/flows.py @@ -1,5 +1,6 @@ from ..vispipe import Pipeline from ..vispipe import block +import numpy as np @block(tag='flows', intercept_end=True) @@ -36,20 +37,22 @@ class batchify: ---------- size : int The size of each bach. + to_array: bool + Whether you want the output as a np array or not. returns ------ Batch of inputs of the size specified. """ - - def __init__(self, size: int = 2): - self.buffer = [] + def __init__(self, size: int = 2, to_array: bool = True): self.size = size + self.to_array = to_array + self.buffer = [] def run(self, x): self.buffer.append(x) if len(self.buffer) == self.size or x is StopIteration: - x = self.buffer + x = np.array(self.buffer) if self.to_array else self.buffer self.buffer = [] return x if x is not StopIteration else x[:-1] return Pipeline._empty diff --git a/vispipe/ops/images.py b/vispipe/ops/images.py index 25eb8d6..3fca6db 100644 --- a/vispipe/ops/images.py +++ b/vispipe/ops/images.py @@ -4,7 +4,7 @@ import numpy as np -@block +@block(tag='images') def random_image(size: tuple = (28, 28)): """ Create a random image of shape specified. @@ -17,7 +17,7 @@ def random_image(size: tuple = (28, 28)): return np.concatenate([np.random.randint(0, 255, size=size + (3,)), np.ones(size + (1,)) * 255], axis=-1) -@block +@block(tag='images') def image_rgb(r, g, b, size: tuple = (28, 28)): """ Create an image with fixed color for r, g, b input channels (0 to 255). diff --git a/vispipe/server/server_gui.py b/vispipe/server/server_gui.py index 859b4d1..1ce1722 100644 --- a/vispipe/server/server_gui.py +++ b/vispipe/server/server_gui.py @@ -76,12 +76,16 @@ def __init__(self): self.screen, self.queue_win, self.log_win = wrapper(MainWindow) mh = CursesHandler(self.log_win) - formatter = logging.Formatter(' %(asctime) -25s - %(name) -15s - %(levelname) -10s - %(message)s') - formatterDisplay = logging.Formatter(' %(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S') - mh.setFormatter(formatterDisplay) + log_formatter = logging.Formatter("%(asctime)s [%(levelname)s]: %(message)s") + mh.setFormatter(log_formatter) + + logger = logging.getLogger('vispipe') + logger.handlers = [] + logger = logging.getLogger('gui-vispipe') logger.handlers = [] logger.addHandler(mh) + sys.stdout = mh sys.stderr = mh @@ -102,8 +106,6 @@ def resize_handler(self, sig, frame): self.screen.refresh() def signal_handler(self, sig, frame): - print('[ Press Enter to exit ]') - self.screen.getch() curses.endwin() sys.exit(0) diff --git a/vispipe/server/static/javascript/js/application.js b/vispipe/server/static/javascript/js/application.js index 453cf89..2490a82 100644 --- a/vispipe/server/static/javascript/js/application.js +++ b/vispipe/server/static/javascript/js/application.js @@ -185,6 +185,7 @@ $(document).ready(function(){ block_dict.tag, block_dict.data_type); obj = pipeline.spawn_node_visual(block, nodes[i]); + pos = viewport.center if (vis_data[nodes[i]] !== undefined){ pos = new PIXI.Point(vis_data[nodes[i]][0], vis_data[nodes[i]][1]); } diff --git a/vispipe/server/static/javascript/js/structures.js b/vispipe/server/static/javascript/js/structures.js index 17f6f5f..04df535 100644 --- a/vispipe/server/static/javascript/js/structures.js +++ b/vispipe/server/static/javascript/js/structures.js @@ -211,16 +211,14 @@ class Pipeline { } spawn_node(block){ + console.log(block.custom_args) var self = this; socket.emit('new_node', block, function() { var closure = block; // Creates closure for block return function(response, status) { if (status === 200){ - block = closure; - block = new Block(block.name, block.input_args, block.custom_args, - block.custom_args_type, block.output_names, - block.tag, block.data_type); - self.spawn_node_visual(block, response.id) + let newblock = $.extend(true,{}, closure); + self.spawn_node_visual(newblock, response.id); } else { console.log(response); } @@ -406,6 +404,8 @@ class PopupMenu { this.currentNode = this.target.node; var block = this.target.node.block; var custom_args = block.custom_args; + console.log(block) + console.log(custom_args, this.currentNode.id) var custom_args_type = block.custom_args_type; var value, type, height; var x = CUSTOM_ARG_SIZE - 215; diff --git a/vispipe/server/vispipe_server.py b/vispipe/server/vispipe_server.py index 46f5bac..8688851 100644 --- a/vispipe/server/vispipe_server.py +++ b/vispipe/server/vispipe_server.py @@ -46,14 +46,14 @@ def __init__(self, PATH_CKPT='./scratch_test.pickle', slow=True, use_curses=True socketio.on_event('disconnect', self.disconnect) self.gui = None - if use_curses: + if use_curses and os.environ.get('WERKZEUG_RUN_MAIN') == 'true': self.gui = CursesQueueGUI() if not self.gui_thread.isAlive(): logging.info('Launching terminal queue visualization') self.gui_thread = Thread(target=self.set_qsize) self.gui_thread.daemon = True self.gui_thread.start() - time.sleep(0.2) + time.sleep(1) socketio.run(app) diff --git a/vispipe/vispipe.py b/vispipe/vispipe.py index 1bde583..d607807 100644 --- a/vispipe/vispipe.py +++ b/vispipe/vispipe.py @@ -2,7 +2,7 @@ from .graph import Graph from functools import partial, reduce from itertools import chain -from inspect import signature, isgeneratorfunction +from inspect import signature from typing import Callable, Optional, List, Union, Tuple, Any import queue import multiprocessing.queues as mpqueues @@ -725,12 +725,12 @@ def __init__(self): self.vis_source = {} self.outputs = {} - def read_vis(self): + def read_vis(self, sync=True): if not self.vis_source: return {} vis = {} - idx = self._vis_index() + idx = self._vis_index(sync=sync) if idx < 0: return {} # This will prevent a crash while waiting for queues to be ready @@ -741,9 +741,12 @@ def read_vis(self): return vis - def _vis_index(self): + def _vis_index(self, sync=True): + sizes = [vis.size() for vis in self.vis_source.values()] + if not sync: + sizes = filter(lambda x: x > 0, sizes) try: - return min(filter(lambda x: x > 0, [vis.size() for vis in self.vis_source.values()])) - 1 + return min(sizes) - 1 except ValueError: return -1 @@ -1046,8 +1049,6 @@ def block(f: Callable = None, max_queue: int = 10, output_names: List[str] = Non data_type=data_type, intercept_end=intercept_end, allow_macro=allow_macro) if isinstance(f, types.FunctionType): - #if not isgeneratorfunction(f): - # raise TypeError('The function you tagged is not a generator') if signature(f).parameters and list(signature(f).parameters.keys())[0] == 'self': raise TypeError( 'The function you passed is a class method, we only support functions right now') @@ -1056,8 +1057,6 @@ def block(f: Callable = None, max_queue: int = 10, output_names: List[str] = Non # Is a custom class so we need to process it differently (instantiate) assert hasattr( f, 'run'), 'The class %s you decorated must have a run method' % f.__name__ - #if not isgeneratorfunction(f.run): - # raise TypeError('The function you tagged is not a generator') is_class = True assert data_type in Pipeline.data_type