• Stars
    star
    108
  • Rank 319,466 (Top 7 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created over 10 years ago
  • Updated over 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Golang like channels for java

Lois

Golang like channels for java

Lois is a Java library that provides golang like channel abstraction and implementation. Go's channel abstraction is heavily influenced by Communicating Sequential Processes(CSP) and Process calculus. The pivotal idea behind concurrent process communicating over channels is

"Don't communicate by sharing state, share state by communicating"

Lois brings the power and flexibility of this concurrent computational paradigm to Java.

Channel

A conduit for communication and coordination

In Lois a channel is a mechanism for two independent threads of execution or Routines to either communicate or coordinate with each other. A channel can be typed and will carry a message only of the appropriate type, it can also be untyped allowing it to carry a message of any type.

/**
* This is a typed channel
*/
Channel<String> typedChannel = new SimpleChannel<String>();

/**
* This is an untyped channel
*/
Channel untypedChannel = new SimpleChannel();

Send and Receive

Send and receive are the most basic operations on a channel. The variants of these operations are the fundamental way in which threads and Routines use to communicate and coordinate with each other. Let's look at the basic send and receive operations over a channel.

/**
 * Thread 1 send's a message over a typed channel
 */
typedChannel.send(" Hello ");

/**
 * Thread 2 receives a message over a typed channel
 */
String message = typedChannel.receive();

In the above example we see how a channel can be used to send and receive messages between concurrent threads or Routines. Both send and receive can block and place the calling thread in a wait state until the message is either sendable or receivable.

Channel<String> typedChannel = new SimpleChannel<String>();

typedChannel.send(" Hello ");

/**
 * Since a SimpleChannel can carry only one message
 * at a time, calling send on the channel when the
 * previous message hasn't been "received" yet
 * blocks the thread and puts it in a wait state.
 */
typedChannel.send(" World! ");

Similarly receive blocks on a channel until there is a message to receive on it.

 /**
 * Receive blocks the thread and put's it in a wait
 * state until there is something to receive over
 * the channel.
 */
String message = typedChannel.receive();

One can use variants of send and receive with timeouts to avoid blocking threads indefinitely.

/**
 * A variant of send that takes a long and a TimeUnit to
 * timeout on a channel. The following code waits for 10
 * milliseconds and timesout to throw a  TimeoutException.
 */
typedChannel.send(" time's running out! ",10, TimeUnit.MILLISECONDS);

/**
 * A variant of receive that takes a long and a TimeUnit to
 * timeout on a channel. The following code waits for 10
 * milliseconds and timesout to throw a  TimeoutException.
 */
typedChannel.receive(10, TimeUnit.MILLISECONDS);

One can also use non blocking variants of send and receive.

/**
 * A non blocking variant of send that attempts to send a
 * message over the channel. It returns "true" if the
 * message could be successfully sent, or a false if the
 * message could not be sent over the channel.
 */
typedChannel.trySend(" trying to send ");

/**
 * A non blocking variant of receive that attempts to
 * receive a message over the channel. It returns the
 * message if a message was successfully received, or
 * a "null" if a message could not be received over
 * the channel.
 */
typedChannel.tryReceive();

One can also check whether a channel is ready to send or receive messages by calling isSendable and isReceivable on the channel. However, if a channel is being shared by multiple threads that send and multiple threads that receive then these can't be safely used to send/receive because the state of the channel could be modified by the time a send/receive is called.

/**
 * Return's true if the channel has space to accept
 * messages
 */
channel.isSendable();

/**
 * Return's true if the channel has atleast one message
 * that can be received.
 */
channel.isReceivable();

Closed for business

A channel can be in one of two states; either open or closed. By default, at creation all channel's are open and can send or receive messages freely. But a channel can be closed and once closed cannot be opened again.

/**
 * One can close a channel by calling close on it
 */
channel.close();

Once a channel is closed trying to send any message's over it will throw a ChannelClosedException. One can still receive all pending messages in the channel, but once all the pending messages have been received calling receive on the channel results in a ChannelClosedException.

channel.close();

/**
 * Throws a ChannelClosedException
 */
channel.send(" doomed to fail ");

One has to think carefully about how and when to close a channel. Since a channel could potentially be shared by multiple threads of execution, closing a channel would make it impossible for other channels to send messages over it. One can check whether a channel is open or closed in the following way.

/**
 * Return's true if the channel is open false if closed
 */
channel.isOpen();

Buffered and Simple channels

The difference between a Buffered and a Simple channel is the number of messages each can successfully hold. A simple channel can hold only one message in the channel.

/**
 * This is a simple channel
 */
Channel simpleChannel = new SimpleChannel();

simpleChannel.send("hello");

/**
 * This blocks if the first message isn't received yet
 * because a SimpleChannel has a capacity of one message
 */
simpleChannel.send("world");

A buffered channel on the other hand can hold a variable number of messages. The capacity of a buffered channel is specified at the time of creation.

/**
 * A buffered channel with a capacity of 3
 */
Channel bufferedChannel = new BufferedChannel(3);

channel.send(1);
channel.send(2);
channel.send(3);

/**
 * This blocks on send if the first 3 messages haven't
 * been received yet.
 */
channel.send(4);

Simple channels are useful for fine grained coordination while buffered channels are performant and useful when dealing with multiple or bursty senders and receivers.

Send only or Receive only channels

A channel by default can be used for full duplex communication, i.e it can be used to both send and receive messages by any thread that has access to it. However, most of the time a thread would only use a channel to either send or recieve messages exclusively. To enforce this behaviour one can use send or recieve channels

/**
 * This channel can only be used to send messages
 */
SendChannel sendChannel = new SimpleChannel();
sendChannel.send("I can send only");

/**
 * This channel can only be used to receive messages
 */
ReceiveChannel receiveChannel = new SimpleChannel();
receiveChannel.receive();

Routines

Routines are simple runnables that can be run by Lois on independent threads.

/**
* Simple routine that accepts a channel as a constructor
* parameter.
*/
Routine sampRoutine = new SampRoutine(stringChannel);

/**
* Start the routine on an independent thread which can then
* receive or send messages over the channel.
*/
Lois.go(sampRoutine);

One need not use routines to use channels. Any way of sharing reference to a channel by independent threads should enable them to use the channel to send and receive messages.

Value vs Reference

The value of "Don't communicate by sharing state, share state by communicating" can only be realized if there is no shared state among concurrent threads. To accomplish this one should refrain from sharing references to the same object, hence any message that is sent over a channel is passed on as a value rather than a reference. This is accomplished by deep cloning the message before sending it across. This makes sure that multiple threads can have access to the value of the message without a danger of shared state being accidentally modified.

However, there is one exception to this pass by value semantic. Any message that is a channel will be passed by reference. This ensures that a channel can be sent over channels while still retaining the ability to communicate/coordinate with any threads that still have a reference to the sent/received channel. This leads to incredible flexibility and power where channels can be used to dynamically alter the network of communicating and coordinating nodes at runtime.

If one can constrain their messages to be immutable, then they can take advantage of the more efficient pass by reference channel mechanisms. The BufferedPassByRefChannel or SimplePassByRefChannel are pass by reference alternatives to BufferedChannel and SimpleChannel.

Higher order channel usage

Lois also provides several simple ways of connecting channels together to create useful patterns.

Multiplexing several channels into one

The mux call multiplexes the messages from several source channels onto one sink channel.

/**
 * Send only channels that will be multiplexed
 */
SendChannel sourceChannel1 = new SimpleChannel();
SendChannel sourceChannel2 = new BufferedChannel(3);

/**
 * Receive only channel that will be used to output
 * the muxed messages
 */
ReceiveChannel combinedChannel = new SimpleChannel();

/**
 * A variadic method that muxes source channels into
 * the sink channel. It takes all messages recieved on
 * souceChannels and transfers them to the combined
 * channel.
 */
Lois.mux(combinedChannel,sourceChannel1, sourceChannel2);
Demultiplexing a single channel into several

The deMux call de-multiplexes the messages from a single source channel onto several sink channels.

/**
 * Receive only channel that will be Demultiplexed
 */
ReceiveChannel sourceChannel = new SimpleChannel();

/**
 * Send only channels that will be used to output
 * the Demuxed messages
 */
SendChannel sinkChannel1 = new SimpleChannel();
SendChannel sinkChannel2 = new SimpleChannel();

/**
 * A variadic method that Demuxes source channel into
 * the sink channels. It takes all messages recieved on
 * souceChannel and transfers them to exactly one of
 * the sink channels
 */
Lois.deMux(sourceChannel,sinkChannel1, sinkChannel2);
Multicasting

The multiCast call multicasts the messages from a single source channel onto several sink channels.

/**
 * Receive only channel that will be multicasted
 */
ReceiveChannel sourceChannel = new SimpleChannel();

/**
 * Send only channels that will be used to output
 * the multicasted messages
 */
SendChannel sinkChannel1 = new SimpleChannel();
SendChannel sinkChannel2 = new SimpleChannel();

/**
 * A variadic method that multicasts source channel into
 * the sink channels. It takes all messages recieved on
 * souceChannel and sends them on all of  the sink channels
 */
Lois.multiCast(sourceChannel,sinkChannel1, sinkChannel2);

These are just some simple ways in which channels can be combined, they are by no means exhaustive and similar higher order constructs between can be built with ease, one is only limited by one's imagination.

Examples

Simple parllelization

In this example we create a simple web page downloader using multiple parallel crawlers and a web page persister.

//Create a list to hold worker channels
List<Channel<WebPage>> crawlerChannels = new ArrayList<Channel<WebPage>>();

//create 10 crawlers each with a dedicated channel over which they will
//send the webpages they crawl.
for (int workerCount=0;workerCount<10;workerCount++){

    //create a crawlerChannel
    Channel<WebPage> crawlerChannel = new BufferedChannel<WebPage>(10);

    //run a crawler on an independent thread with a beginning url and
    //a crawlerChannel over which to send web pages
    Lois.go(new Crawler(getBeginUrl(), crawlerChannel));

    //add the crawler channel to list of crawlerchannels
    crawlerChannels.add(crawlerChannel);
}

//create a sink channel to consume messages from all the crawler channels
SendChannel<WebPage> sinkChannel = new BufferedChannel<WebPage>(10);

//multiplex crawler channels on to a sink channel
Lois.mux(sinkChannel, crawlerChannels);

//persist webpages on disk
Lois.go(new WebPagePersister(sinkChannel));

Rudimentary Connection Pool

In this example we create a simple, threadsafe connection pool.

//creates a list of 5 connections
List<Connection> connectionList = ConnectionFactory.createConnections("localhost", 80, 5);

//create a BufferedChannel to hold 5 connections
Channel<Connection> connectionPoolChannel = new BufferedChannel<Connection>(5);

for (Connection connection: connectionList){
    connectionPoolChannel.send(connection);
}

//to take a connection from the pool any thread can receive a connection
Connection connection = connectionPoolChannel.receive();

//to release a connection any thread can send it to the channel
connectionPoolChannel.send(connection);

Maven Artifact

Add the following repository to your pom.xml

    <repository>
      <id>clojars</id>
      <name>Clojars repository</name>
      <url>https://clojars.org/repo</url>
    </repository>

And add the following dependency to start using Lois in your maven project.

   <dependency>
     <groupId>com.flipkart.lego</groupId>
     <artifactId>lois</artifactId>
     <version>1.1.3</version>
   </dependency>

Documentation

The api docs can be found here

Contribution, Bugs and Feedback

For bugs, questions and discussions please use the Github Issues. Please follow the contribution guidelines when submitting pull requests.

License

Copyright 2014 Flipkart Internet, pvt ltd.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

More Repositories

1

Astra

Automated Security Testing For REST API's
Python
2,484
star
2

proteus

Proteus : A JSON based LayoutInflater for Android
Java
1,302
star
3

zjsonpatch

This is an implementation of RFC 6902 JSON Patch written in Java
Java
522
star
4

watchdog

Watchdog - A Comprehensive Security Scanning and a Vulnerability Management Tool.
Python
410
star
5

RTA

Red team Arsenal - An intelligent scanner to detect security vulnerabilities in company's layer 7 assets.
Python
409
star
6

springy-heads

Chat heads library for android
Java
370
star
7

android-inline-youtube-view

Utility library around using YouTube inside your android app.
Java
323
star
8

fk-visual-search

Flipkart's visual search and recommendation system
Python
172
star
9

flux

Highly scalable Event-driven, Reactive system for building Stateful apps and Workflow services.
Java
107
star
10

optimus

Train, evaluate and deploy Deep Learning based text classifiers. Currently supports CNN
Python
105
star
11

okhttp-stats

OkHttp Analytical library to get stats like average network speed. Also get global callbacks for network errors and successes. Can be used for logging errors to Fabric or Firebase analytical tools
Java
95
star
12

phrontend

A Framework to build rich UIs
JavaScript
88
star
13

ContentSheet

A simple control that enables presenting any view controller or navigation controller or any other object that can provide a view like an ActionSheet
Swift
82
star
14

hbase-orm

A production-grade HBase ORM library that makes accessing HBase clean, fast and fun (Can also be used as Bigtable ORM)
Java
78
star
15

madman-android

Madman (Media ads manager) is a high performance alternative to Google's standard IMA android SDK. If you have your own VAST server and want to render video ads and have full control over the UI, then this library is for you.
Kotlin
69
star
16

dkv

Distributed KV data store with tunable consistency, synchronous replication
Go
62
star
17

fk-ios-chatheads

Objective-C
58
star
18

batchman

This library for Android will take any set of events and batch them up before sending it to the server. It also supports persisting the events on disk so that no event gets lost because of an app crash. Typically used for developing any in-house analytics sdk where you have to make a single api call to push events to the server but you want to optimize the calls so that the api call happens only once per x events, or say once per x minutes. It also supports exponential backoff in case of network failures
Java
55
star
19

priority-kafka-client

Library to support priority queuing in Kafka
Java
54
star
20

animation-wrapper-view

Declarative animations with imperative controls for RN/RNW.
TypeScript
52
star
21

phantom

Phantom is a high performance proxy for accessing distributed services. It is an RPC system with support for different transports and protocols. Phantom is inspired by Twitter Finagle clients and builds on the capabilities of technologies like Netty, Unix Domain Sockets, Netflix Hystrix and Spring. Phantom proxies have been used to serve several hundred million API calls in production deployments at Flipkart.
Java
49
star
22

spark-transformers

Spark-Transformers: Library for exporting Apache Spark MLLIB models to use them in any Java application with no other dependencies.
Java
40
star
23

kafka-filtering

Very fast & efficient grep for Kafka stream
Java
38
star
24

databuilderframework

A data driven execution engine
Java
33
star
25

circular-image

Creates circular image drawable.
Java
30
star
26

android-studio-proteus-plugin

Plugin for Android studio which helps you to convert XML resources to JSON which can be used by proteus.
Java
29
star
27

storm-mysql

Storm Spout that reads off MySql Bin Logs
Java
29
star
28

ranger

Feature rich service discovery on ZooKeeper
Java
27
star
29

android-video-player

Kotlin
26
star
30

varadhi

Java
24
star
31

Poseidon

Platform to build API applications that have to aggregate data from distributed services in an efficient way.
Java
21
star
32

ios-inline-youtube-view

Utility library around using YouTube inside your iOS app.
Objective-C
20
star
33

grpc-jexpress

Developer friendly container for writing gRPC services using grpc-java
Java
17
star
34

hbase-k8s-operator

Hbase operator for kubernetes
Go
15
star
35

diligent

Run performance experiments on MySQL compatible databases
Go
15
star
36

StockPile

Provides in-memory and database based caching
Objective-C
14
star
37

Hunch

Hunch allows users to turn arbitrary machine learning models built using Python into a scalable, hosted service.
Python
14
star
38

scroll-coordinator-ios

ScrollCoordinator allows you to attach gestures to scrollviews and perform behaviours like Hiding the navigation bar, hiding the bottom bar and anchoring your scroll on these gestures.
Swift
13
star
39

gojira

Gojira is a record and replay framework for Java apps meant for regression testing. It provides complete recording capability within a single request-response scope, by recording request, response and any external interactions(outside of the jvm), thereby circumventing the need to provide a mock service.
Java
12
star
40

kubric

Android's co-ordinator layout ported to work on react native (Android, iOS and on the web)
TypeScript
11
star
41

babel-plugin-pseudolocalize-react-native

Babel plugin to transform React Native Text nodes with a custom language map for Pseudo-localization
JavaScript
11
star
42

pulsar-weighted-consumer

Pulsar consumer clients offering priority consumption
Java
10
star
43

phrontend-webpack

A webpack config maker for phrontend apps
JavaScript
10
star
44

kafka-balancer

Balance partitions among brokers with minimal data movement. Supports re-replication of under replicated partitions and setting replication as well.
Java
9
star
45

hbase-compactor

Trigger major compaction in Hbase
Java
9
star
46

BlueShift

Hadoop Data Mover Project
Java
8
star
47

android-RDX

Redux for Android.
Java
8
star
48

lenna

Go
8
star
49

Krystal

Java
7
star
50

Lyrics

Java
7
star
51

Iris-BufferQueue

A fast, in-process, persisted queue for buffering data on local host.
Java
7
star
52

hbase-sep

HBase side-effect-processor to stream changes from WAL to kafka sink.
Java
6
star
53

Cuppa

Caffe as a service and KNN as a service
Python
5
star
54

chronosq

⏱ A Scalable Distributed Scheduler
Java
5
star
55

hydra

A JVM based DispatcherComposer Engine
Java
5
star
56

mongodb-replicator

Mongodb Replicator java library for both sharded & non-sharded mongo setup.
Java
4
star
57

CTDrive

A tool which uses Google drive APIs internally and allows users to easily navigate through their files. This tool allows to maintain all organisation documents at one place similar to Confluence
TypeScript
4
star
58

polyglot

Tiered data store for use by on-line applications
Java
4
star
59

nexus

Sync replication using RAFT consensus onto pluggable backends
Go
4
star
60

dropwizard-one

Dropwizard One is a wrapper around Dropwizard that enables writing web services with minimal boilerplate code and good testability.
Java
4
star
61

Service-Worker-Tools

HTML
4
star
62

dropwizard-multitenancy

Java
4
star
63

bifrost

A remote execution framework over RabbitMQ.
Java
3
star
64

vbroker

high throughput PUSH based messaging
Java
3
star
65

wolverine

Master elected daemon, which heals itself
Java
3
star
66

prognos

Scala
3
star
67

truss

Javascript Framework
JavaScript
3
star
68

Lego

Library to build any entity (web/api response) in a scatter-gather fashion
Java
3
star
69

protobuf-util

Utility library for protobuf
Java
3
star
70

simpleJobScheduler

Java
3
star
71

ultra-docs

Documentation for Project Ultra : Flipkart's App-in-App experience
3
star
72

dsp

Java
2
star
73

tef

Java
2
star
74

molecule

ML platform with seamless experiment tracking and sharing. Supports multiple languages and platforms, meta-learning constructs and MLOps.
CSS
2
star
75

rediscast

Rediscast is an In-memory data grid, that synchronizes Entity changes in Redis server with all cluster nodes (JVM instances) within a short SLA. It uses Redis Streams for change data propagation.
Java
2
star
76

polyguice

A set of useful extensions to Google Guice and better integration with popular frameworks for creation of highly concurrent micro-services.
Java
2
star
77

portkey

Portkey is a Java model abstraction of persistence that works across multiple data stores and supports sharding. Entities can be persisted to more than one data store based on a set of rules.
Java
2
star
78

Swagger-publish

Java
2
star
79

ottoscalr

Ottoscalr provides a drop-in component for autonomous management of HPAs
Go
2
star
80

Regnator

Rule based decision engine. RESTful service where one could define the facts/variables and configures rules for the evaluation on the facts/variables and eventually choose one of the configured decision/result.
1
star
81

CoreDataLite

Lightweight core data library for iOS
Objective-C
1
star
82

statreduce

A library which to write Hadoop MapReduce jobs with map step in Java and reduce step in R for statistical computations
Java
1
star
83

homebrew-taps

Ruby
1
star
84

light-house

JavaScript
1
star
85

jupyterlab-extensions

Jupyterlab extensions
TypeScript
1
star
86

continuum

Lambda Architecture is not enough! Continuum is more fundamental
1
star
87

storm-sidelining

Java
1
star
88

turbo-relayer

Java
1
star
89

foxtrot-client

A service discovery client for foxtrot.
Java
1
star