Streams and Pipelines in EK9

This concept has been touched upon in the Introduction, please read that section first as this section has much more detail.

Concept

The concept of a pipeline process is present in the Unix shell and also programming languages with chained methods (like filter, map and reduce). In the EK9 language the idea is to 'stream' a sequence of zero or more Objects through a notional sequence of commands. During the streaming, some of the commands may be configured to prevent further passage of Objects that have specific properties or characteristics.

But importantly as an Object passes through this notional sequence of commands it can also be 'transformed' into a different type. This may mean the data held is enriched or it could be simplified.

Finally the Objects (if any) emerging from the pipeline can be used in some way.

So there are three discrete phases to this streaming:

Starting the stream

The EK9 language has two different ways to catenate Objects.

Just these two mechanisms are all that is needed to start a Stream pipeline. Any type of Object can be sent through the pipeline, this includes function delegates. This is very important if you wish to do any sort of 'asynchronous' processing.

Pipeline processing

The streaming mechanism uses the pipe '|' symbol for joining parts of the pipeline just like Unix and it includes:

But also adds:

Collection or Reduction

At the end of the pipeline zero or more Objects will flow out. A decision of what to do with these Objects must be taken. You can do one of three things with these Objects

When you are developing your own records and classes you can provide your own '|' operator implementation and then your types can be used as the terminal ('collector'/'sink') in a stream pipeline.

If you are familiar with Unix/Linux this type of syntax will be second nature to you.

Examples

So as to aid understanding; there are a number of inline examples below. Following each of these there is an explanation of how the code works.

There are some examples elsewhere on this site that show join, call and async. The '>>' is shown in the duration/date example.

For a 'reducing' or 'accumulating' example; see Integer pipeline.

If you have a need to stream Objects through a pipeline and then at the final stage only keep the final N of them based on some weighting criteria; then consider using a priority queue configured to a finite size.

The Preamble and setup

The source code below uses an example of 'Books' and 'Authors' and demonstrates the rest of the Stream pipeline commands. It introduces the concept of a library of books that have been published. It then uses the stream pipeline functionality to extract information about the books. The first few examples are trivial, but the latter few build up quite a significant processing pipeline.

The same setup EK9 source code will be used for all the examples, it is shown here once for brevity.

#!ek9
defines module introduction

  defines type
    
    AuthorId as Integer constrain
      > 0

    Age as Integer constrain
      > 0

    Name as String constrain as
      matches /^[a-zA-Z -]+$/

    BookTitle as String constrain as
      matches /^[a-zA-Z0-9 -+]+$/

  defines class

    Author
      id as AuthorId?
      age as Age?
      firstname as Name?
      surname as Name?

      Author() as pure
        ->
          id as AuthorId
          age as Age
          firstname as Name
          surname as Name
        assert id? and age? and firstname? and surname?
        this.id: AuthorId(id)
        this.age: Age(age)
        this.firstname: Name(firstname)
        this.surname: Name(surname)

      Author() as pure
        -> author as Author
        this(author.id(), author.age(), author.firstname(), author.surname())

      Author() as pure
        assert false

      id() as pure
        <- rtn as AuthorId: id

      age() as pure
        <- rtn as Age: age

      firstname() as pure
        <- rtn as Name: firstname

      surname() as pure
        <- rtn as Name: surname

      operator #? as pure
        <- rtn as Integer: #?id

      operator $ as pure
        <- rtn as String: "Author: " + firstname + " " + surname + " Age: " + $age

      operator ? as pure
        <- rtn as Boolean: age? and firstname? and surname?

      operator :=:
        -> author as Author
        id :=: author.id()
        age :=: author.age()
        firstname :=: author.firstname()
        surname :=: author.surname()

    Book
      title as BookTitle?
      author as Author?
      published as Date?

      Book() as pure
        ->
          title as BookTitle
          author as Author
          published as Date?
        assert title? and author? and published?
        this.title: BookTitle(title)
        this.author: Author(author)
        this.published: Date(published)

      author() as pure
        <- rtn as Author: author

      published() as pure
        <- rtn as Date: published

      operator $ as pure
        <- rtn as String: "Title: " + $title + ", " + $author + " Published: " + $published

      operator #^ as pure
        <- rtn as String: $this

    Library
      //These are made up books and Age is when the author wrote the book
      //There are no books on EK9 yet!
      books as List of Book: [
        Book(BookTitle("Java"), Author(AuthorId(1), Age(50), Name("John"), Name("Doe")), 1998-01-01),
        Book(BookTitle("C++"), Author(AuthorId(1), Age(42), Name("John"), Name("Doe")), 1990-01-07),
        Book(BookTitle("Scala"), Author(AuthorId(1), Age(67), Name("John"), Name("Doe")), 2015-03-02),
        Book(BookTitle("Python"), Author(AuthorId(1), Age(62), Name("John"), Name("Doe")), 2010-12-02),
        Book(BookTitle("HTML"), Author(AuthorId(2),Age(58), Name("Mark"), Name("Pickford")), 2008-07-02),
        Book(BookTitle("CSS"), Author(AuthorId(4), Age(51), Name("Mark"), Name("Keely")), 2008-04-02),
        Book(BookTitle("ADA"), Author(AuthorId(5), Age(44), Name("Ada"), Name("Lovelace")), 1988-01-02),
        Book(BookTitle("Dart"), Author(AuthorId(6), Age(47), Name("Peter"), Name("Dove")), 2020-01-02),
        Book(BookTitle("C#"), Author(AuthorId(7), Age(60), Name("William"), Name("Fence")), 2012-10-02),
        Book(BookTitle("Javascript"), Author(AuthorId(3), Age(52), Name("James"), Name("Pickford")), 2008-03-02),
        Book(BookTitle("C"), Author(AuthorId(1), Age(42), Name("John"), Name("Doe")), 1990-01-07),
        Book(BookTitle("C++"), Author(AuthorId(7), Age(38), Name("William"), Name("Fence")), 1990-04-02),
        Book(BookTitle("C"), Author(AuthorId(7), Age(38), Name("William"), Name("Fence")), 1990-04-14),
        Book(BookTitle("Haskell"), Author(AuthorId(7), Age(30), Name("William"), Name("Fence")), 1982-04-14),
        Book(BookTitle("Lisp"), Author(AuthorId(7), Age(25), Name("William"), Name("Fence")), 1977-09-24)
        ]

      books() as pure
        <- rtn as List of Book: books        
 ...

The Model

The model is quite simple, a few types, a couple of classes for Books, Authors and the Library. As this is a stream pipeline example and more functional the pure key word has been used to highlight immutability (actually it's more than a highlight - the compiler enforces it). Strictly speaking clones/copies of properties should have been taken when returning values with accessor methods. As this would then prevent any alterations being made to Objects provided or returned.

The Programs

The first program is the most simple and just outputs all the books to 'Standard Out'

Program 1

...
  defines program

    JustCatBooks()
      stdout <- Stdout()
      books <- Library().books()
      cat books > stdout
      //Could have been written as
      //cat Library().books() > Stdout()
//EOF

The output of the above program is as follows:

Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: HTML, Author: Mark Pickford Age: 58 Published: 2008-07-02
Title: CSS, Author: Mark Keely Age: 51 Published: 2008-04-02
Title: ADA, Author: Ada Lovelace Age: 44 Published: 1988-01-02
Title: Dart, Author: Peter Dove Age: 47 Published: 2020-01-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
Title: Javascript, Author: James Pickford Age: 52 Published: 2008-03-02
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
        

The output is unsurprising, as no real processing has taken place. Note by providing the '#^' (promote) operator on the Book class; Objects of type Book can be sent straight to stdout as they get converted (promoted) to a String.

Program 2

This program just sorts the Books by Author ID and outputs all the books to 'Standard Out'

The use of sort is the first example of a stream pipeline command, it takes a single parameter. This parameter is the name of a function. It could have been anything that provided a function; this includes a call to a higher order function. It could also be a dynamic function. In this case a simple pure function has been used.

By building a library of short pure functions together with a set of records, traits and classes you can optimise your code reuse and importantly create a set of unit tests and examples of how they can be used.

...
  defines function

    compareAuthor() as pure
      ->
        book1 as Book
        book2 as Book
      <-
        rtn as Integer: book1.author().id() <=> book2.author().id()

  defines program

    SortBooksByAuthor()
      stdout <- Stdout()
      books <- Library().books()
      cat books | sort by compareAuthor > stdout
//EOF

The output of the program is as follows

Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: HTML, Author: Mark Pickford Age: 58 Published: 2008-07-02
Title: Javascript, Author: James Pickford Age: 52 Published: 2008-03-02
Title: CSS, Author: Mark Keely Age: 51 Published: 2008-04-02
Title: ADA, Author: Ada Lovelace Age: 44 Published: 1988-01-02
Title: Dart, Author: Peter Dove Age: 47 Published: 2020-01-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
        

The output is ordered as you would expect on the Author ID.

Program 3

John Doe has two books published on the same date (1990-01-07), this next example now groups books by the same author if they were published on the same date. Note it does not yet order the books by an author and by published date (that is yet to come).

...
  defines function

    dateBookPublished() as pure
      -> book as Book
      <- rtn as Date: book.published()

  defines program

    GroupBooksByPublishedDate()
      stdout <- Stdout()
      books <- Library().books()
      cat books | sort by compareAuthor | group by dateBookPublished > stdout
//EOF

The output of the program is as follows

Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: HTML, Author: Mark Pickford Age: 58 Published: 2008-07-02
Title: Javascript, Author: James Pickford Age: 52 Published: 2008-03-02
Title: CSS, Author: Mark Keely Age: 51 Published: 2008-04-02
Title: ADA, Author: Ada Lovelace Age: 44 Published: 1988-01-02
Title: Dart, Author: Peter Dove Age: 47 Published: 2020-01-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
        

Now the books on 'C' and 'C++' by John Doe are grouped together.

Program 4

This example program now orders the books by their published date, but leaves them ordered by author.

...
  defines function

    differentAuthor() as pure abstract
      -> book as Book
      <- rtn as Boolean
      
    getAuthorChange() as pure
      <- authorChanged as differentAuthor
      byId <- AuthorId()
      authorChanged: (byId) is differentAuthor()
        rtn: byId? and byId != book.author().id()        
        byId: book.author().id()

    compareDatePublished() as pure
      ->
        book1 as Book
        book2 as Book
      <-
        rtn as Integer: book1.published() <=> book2.published()

    orderOnPublishedDate() as pure
      -> books as List of Book
      <- rtn as List of Book: cat books | sort by compareDatePublished | collect as List of Book

  defines program
    ProcessByAuthor()
      stdout <- Stdout()
      books <- Library().books()

      authorId <- getAuthorChange()

      cat books | sort by compareAuthor | split by authorId | map by orderOnPublishedDate | flatten > stdout
//EOF

Now this is a little more interesting!
Again the list is ordered by Author ID first. But then the stream is split into Lists of Books by each specific author.

The function delegate authorId is provided by the higher order function getAuthorChange. This higher order function creates a 'stateful' dynamic function that can track the Author ID as each Book streams through the function (hence it is important that the books are in Author order). It is not pure as it holds state that is altered. If you're struggling with this code take a look back at Dynamic Functions.

For some programmers this approach may seem strange, using loops and a state variable might be more obvious. All the function does is return a boolean true/false if the Author ID has changed, that it. The Stream pipeline deals with creating and sending lists as the output of the split command.

So in someways split acts like map and turns a stream of 'Books' (in this case) into a stream of 'List of Book', but based on the Author ID changing.

The next command in the pipeline is map, this uses a function orderOnPublishedDate. Note here that the Object being streamed through the pipeline has now become a 'List of Book', not just 'Book'. So function orderOnPublishedDate accepts a 'List of Book', it too now uses a Stream pipeline to order that list. It returns a totally new list (but ordered), here you can see collect as being used - which is a stream pipeline terminal command.

Now we need a stream of 'Book' again and not a stream of 'List of Book', so just use the flatten (thank you origin?) to take the contents of each List and output the Books in sequence. Note that flatten can be used with Optional as well.

One of the issues with Stream pipelines (and chaining methods in other languages) is 'knowing' what the type is at any point along the pipeline; as this type an be altered during processing. EK9 makes this easy because of strong and explicit typing of functions and methods.

For example, sort never alters the type; so whatever the sort function uses for parameters is the pipeline type. Split always produces a 'List' of the current pipeline type. But map is the key command, this is where types can be transformed. Look at the input type and the output type. Finally flatten just take something like a 'List of Book' and outputs 'Book' as the pipeline type (i.e. each Book in that List).

Summary

This processing has been some what more sophisticated, but each function as a single simple purpose and they can and will be reused (and can be tested in isolation). Importantly the stream pipeline itself remains very readable, succinct and easy to understand. But there's no doubt this approach takes more thought to design and implement than an imperative loop with state variables!

The output of the program is as follows

Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: HTML, Author: Mark Pickford Age: 58 Published: 2008-07-02
Title: Javascript, Author: James Pickford Age: 52 Published: 2008-03-02
Title: CSS, Author: Mark Keely Age: 51 Published: 2008-04-02
Title: ADA, Author: Ada Lovelace Age: 44 Published: 1988-01-02
Title: Dart, Author: Peter Dove Age: 47 Published: 2020-01-02
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
        

As you can see the list of books is now in both author order and book publish date order (for that author).

Program 5

This example builds on the last one, but limits the output to authors with three or more books.

...
  defines function

    sufficientBooks() as pure
      -> books as List of Book
      <- rtn as Boolean: length books >= 3

  defines program

    ProcessByAuthorWithThreeOrMoreBooks()
      stdout <- Stdout()
      books <- Library().books()

      authorId <- getAuthorChange()

      cat books
        | sort by compareAuthor
        | split by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | flatten
        > stdout
//EOF

Because the pipeline has now become quite long, the alternative vertical layout has been employed. In general vertical layouts make code much more readable (though increase page length!).

This example uses the filter/select command to only allow a 'List of Book' where there are three or more books in the list.

The output of the program is as follows

Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
        

Program 6

Again this example builds on the last, but just omits the first two books by authors that have three or more books published.

...
  defines function
  
    bookNumberFilter() as pure abstract
      -> books as List of Book
      <- rtn as List of Book
       
  defines program

    ProcessByAuthorWithThreeOrMoreBooksIgnoreFirstTwo()
      stdout <- Stdout()
      books <- Library().books()

      authorId <- getAuthorChange()
      
      excludingFirstTwoBooks <- () is bookNumberFilter as pure function
        limited <- cat books | skip 2 | collect as List of Book
        rtn: limited
        
      cat books
        | sort by compareAuthor
        | split by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map by excludingFirstTwoBooks
        | flatten
        > stdout
      
//EOF

The output of the program is as follows

Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
        

The following two examples follow in the same theme, by showing just the first and just the last book by the author with three or more books. By introducing the abstract function bookNumberFilter the dynamic function excludingFirstTwoBooks can be introduced.

The dynamic function excludingFirstTwoBooks could have been written as a standard function. Alternatively the dynamic function could have been returned through the use of a higher function.

Program 7

...
  defines program

    ProcessByAuthorWithThreeOrMoreBooksJustFirst()
      stdout <- Stdout()
      books <- Library().books()

      authorId <- getAuthorChange()
      
      firstBook <- () is bookNumberFilter as pure function
        limited <- cat books | head 1 | collect as List of Book
        rtn: limited
        
      cat books
        | sort by compareAuthor
        | split by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map with firstBook
        | flatten
        > stdout

//EOF

The output of the program is as follows

Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
        

Program 8

...
  defines program

    ProcessByAuthorWithThreeOrMoreBooksJustLast()
      stdout <- Stdout()
      books <- Library().books()

      authorId <- getAuthorChange()
      lastBook <- () is bookNumberFilter as pure function
        limited <- cat books | tail 1 | collect as List of Book
        rtn: limited
        
      cat books
        | sort by compareAuthor
        | split by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map with lastBook
        | flatten
        > stdout

//EOF

The output of the program is as follows

Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
        

Reuse

If you have a range of requirements that need different views/summaries or filters on a set of data; the development of a number of reusable functions as above demonstrates how you can maximize code reuse.

While it is possible to use an imperative 'loop' style with loop variables and intermediate collections built up during processing, none of this tends to be reusable as it is nested in the loops and is conditioned.

Clearly if you only have a single simple requirement to access the latest book published by author ID=1 you might be tempted by a simple loop. In general (depending on the longevity of the software) it almost inevitable additional queries will be required. By adopting the Stream pipeline approach early you may have to write slightly more code, but then it will be obvious how to extend it and more importantly reuse what you've already written. Moreover the design pattern is a known one rather than being some form of while, do-while or iterator loop. A short example of this is shown later.

Program 9

Below is an example of using an enumeration and a higher order function; this returns a function based on the enumeration value.

...
  defines type

    BookFilterSelection
      SkipTwo
      JustFirst
      JustLast
        
  defines function

    suitableBookFilter()
      -> selection as BookFilterSelection
      <- bookFilter as bookNumberFilter
      
      bookFilter: switch selection
        <- theFilter as bookNumberFilter
        case BookFilterSelection.SkipTwo
          theFilter: () is bookNumberFilter as pure function
            limited <- cat books | skip 2 | collect as List of Book
            rtn: limited
        case BookFilterSelection.JustFirst
          theFilter: () is bookNumberFilter as pure function
            limited <- cat books | head 1 | collect as List of Book
            rtn: limited
        case BookFilterSelection.JustLast
          theFilter: () is bookNumberFilter as pure function
            limited <- cat books | tail 1 | collect as List of Book
            rtn: limited
        default
          theFilter: () is bookNumberFilter as pure function            
            rtn: books

  defines program
  
    ProcessByAuthorUsingHigherOrderFunction()
      stdout <- Stdout()
      books <- Library().books()

      authorId <- getAuthorChange()
      
      stdout.println("Omit first two books where author has three or more books")
      excludingFirstTwoBooks <- suitableBookFilter(BookFilterSelection.SkipTwo)
      cat books
        | sort by compareAuthor
        | split by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map with excludingFirstTwoBooks
        | flatten
        > stdout
      
      stdout.println("First book where author has three or more books")
      firstBook <- suitableBookFilter(BookFilterSelection.JustFirst)
      cat books
        | sort by compareAuthor
        | split by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map with firstBook
        | flatten
        > stdout
      
      stdout.println("Last book where author has three or more books")
      lastBook <- suitableBookFilter(BookFilterSelection.JustLast)
      cat books
        | sort by compareAuthor
        | split by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map with lastBook
        | flatten
        > stdout
//EOF

The example above just shows how an enumeration can be used with a higher order function to 'switch' on the enumeration to return an appropriate dynamic function. From this example it's quite easy to see how the above pipeline code can be refactored again, so that it too can be wrapped into a function.

Program 10

This program is a simple refactoring of the one above, to highlight functional composition and how useful strong typing of functions and higher order functions can be.

...
  defines function

    filterBooksToOutput()
      ->
        books as List of Book
        filterSelection as BookFilterSelection
        output as StringOutput
        
      authorId <- getAuthorChange()  
      theBookFilter <- suitableBookFilter(filterSelection)
      cat books
        | sort by compareAuthor
        | split by authorId
        | select with sufficientBooks
        | map by orderOnPublishedDate
        | map with theBookFilter
        | flatten
        > output

  defines program

    ProcessByAuthorUsingParameterisedFunction()
      stdout <- Stdout()
      books <- Library().books()
              
      stdout.println("Omit first two books where author has three or more books")
      filterBooksToOutput(books, BookFilterSelection.SkipTwo, stdout)
      
      stdout.println("First book where author has three or more books")
      filterBooksToOutput(books, BookFilterSelection.JustFirst, stdout)
           
      stdout.println("Last book where author has three or more books")      
      filterBooksToOutput(books, BookFilterSelection.JustLast, stdout)
                
//EOF

By mixing in simple (enumerated) types and the switch expression it is possible to see how a range of different solutions can be structured with a number of small reuseable functions.

When linked with the polymorphism of traits, classes and importantly functions the whole solution is more flexible and fungible in terms of which actual filter functions are used and also where results are output.

By taking the time to create abstract functions, simple types and records with a select number of operators refactoring and functional composition is much easier.

The output is as you would expect; the same as the previous example.

Omit first two books where author has three or more books
Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
First book where author has three or more books
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
Last book where author has three or more books
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02       
        

Program 11

This example is slightly different and demonstrates how parts of the processing pipeline can be tee'd off and saved to be used later. It also demonstrates how streamed items can be removed if duplicated by some key or other (in this case the publication date is used).

...
  defines function

    bookSigningEvent() as pure
      -> book as Book
      <- rtn as String: "Date: " + $book.published()

  defines program

    UniquePublishingDatesFromAuthorWithThreeOrMoreBooks()
      stdout <- Stdout()
      books <- Library().books()

      authorId <- getAuthorChange()
      authorsBooks as List of Book: List()

      booksByEachAuthor as List of List of Book: List()

      stdout.println("Unique signing events on day of publication by authors with three or more books.")

      cat books
        | sort by compareAuthor
        | split by authorId        
        | tee in booksByEachAuthor
        | filter by sufficientBooks
        | map by orderOnPublishedDate
        | flatten
        | tee in authorsBooks
        | uniq by dateBookPublished
        | sort by compareDatePublished
        | map with bookSigningEvent
        > stdout

      stdout.println("Books From author with three or more books")
      cat authorsBooks > stdout

      stdout.println("There are " + $ length booksByEachAuthor + " authors in total with any number of books")
//EOF

The output of the program is as follows

Unique signing events on day of publication by authors with three or more books.
Date: 1977-09-24
Date: 1982-04-14
Date: 1990-01-07
Date: 1990-04-02
Date: 1990-04-14
Date: 1998-01-01
Date: 2010-12-02
Date: 2012-10-02
Date: 2015-03-02
Books From author with three or more books
Title: C++, Author: John Doe Age: 42 Published: 1990-01-07
Title: C, Author: John Doe Age: 42 Published: 1990-01-07
Title: Java, Author: John Doe Age: 50 Published: 1998-01-01
Title: Python, Author: John Doe Age: 62 Published: 2010-12-02
Title: Scala, Author: John Doe Age: 67 Published: 2015-03-02
Title: Lisp, Author: William Fence Age: 25 Published: 1977-09-24
Title: Haskell, Author: William Fence Age: 30 Published: 1982-04-14
Title: C++, Author: William Fence Age: 38 Published: 1990-04-02
Title: C, Author: William Fence Age: 38 Published: 1990-04-14
Title: C#, Author: William Fence Age: 60 Published: 2012-10-02
There are 7 authors in total with any number of books
        

The interesting part in the example above is the multiple use of the tee command to take contents as they stream through the pipeline to be processed and store them in separate collections.

Limiting the output by the use of the uniq command on a specific field (in this case the function dateBookPublished uses the date the book was published as the uniqueness key).

Program 12 (The Final Example)

This example is totally different, it only uses book authors where their age is 50 or over (at the time of writing the book). It then ensures that the author is only included once, finally it sorts the authors by surname (and first name if their surname is the same) and outputs that name.

...
  defines function

    bookAuthor() as pure
      -> book as Book
      <- author as Author: book.author()

    acceptableAuthorAge() as pure
      -> author as Author
      <- rtn as Boolean: author.age() >= Age(50)

    comparingAuthorName() as pure
      ->
        author1 as Author
        author2 as Author
      <-
        rtn as Integer: author1.surname() <=> author2.surname()
      if rtn == 0
        rtn: author1.firstname() <=> author2.firstname()

    authorName() as pure
      -> author as Author
      <- rtn as Name: author.firstname() + " " + author.surname()
  
    theAuthorId() as pure
      -> author as Author
      <- rtn as AuthorId: author.id()

  defines program

    LibraryExample()
      stdout <- Stdout()
      books <- Library().books()

      cat books
        | map with bookAuthor
        | select by acceptableAuthorAge
        | uniq by theAuthorId
        | sort by comparingAuthorName
        | map with authorName
        > stdout
//EOF

The output of the program is as follows

John Doe
William Fence
Mark Keely
James Pickford
Mark Pickford
        

Summary

The Stream pipeline functionality takes the most functional approach to software development, but when used with functions, traits, records, classes and collections removes the need for deep and layered nested loops with stateful variables.

It can also be tolerant of missing data if needs be. While not shown in these examples, if the published data had been un set there would have been no null pointer exceptions.

It can take some time for developers with an imperative or Object Oriented background to become used to this approach. With many OO languages now including 'chained'/'fluent' API's; maybe the clean simple and composable syntax in EK9 should be easier to understand and adopt. The syntax has been chosen to be very similar to the Unix/Linux shell 'pipe' approach as the concept is so similar.

There is little doubt that this same functionality could have been written with much less code using an imperative style. EK9 does support this, but in general the smaller, more rounded and documented the components developed then more reliable/reusable and easier to understand the code is over the longer term.

But probably more importantly - the design pattern is a standard one and it leaps off the page as to what is being done. This is very unlike nested arbirary loops, these tend to need more time to 'crock'. Each time similar functionality is needed it can and probably will be implemented in slightly different ways (this can lead to confusion and defects).

In fact here is a partial procedural/imperative implementation without the author name sorting as a contrast.

...
  defines program
    ImperativeLibraryExample()
      stdout <- Stdout()
      books <- Library().books()
      
      uniqueAuthors as Dict of (AuthorId, Author): Dict()
      
      for book in books
        author <- book.author()
        //50 or over is the acceptable author age
        if author.age() >= 50
          if uniqueAuthors not contains author.id()
            uniqueAuthors += DictEntry(author.id(), author)
      
      for author in uniqueAuthors.values()
        stdout.println(author.firstname() + " " + author.surname())
//EOF

The uniq processing has been achieved through the use of a Dictionary/Map, but it could have been done with a List.
Here is the output (note authors names are not sorted)

John Doe
Mark Pickford
James Pickford
Mark Keely
William Fence        
        

The code is some what shorter (albeit without any sorting of author names) and for one off short simple processing may be it's a better approach. But the minute you then need to make a few alterations here and there; the Stream pipeline approach become much easier to alter change and manipulate.

Conclusion

Hopefully these Stream pipelines will give a developer a set of flexible and standardised tools to process collections of Objects in a flexible and clear manner. This more functional approach has much to be said for it as it reduces the number of methods needed on classes and provides for much more reuse of code.

But in general a more functional approach take quite a bit more thought and sometimes more code than an imperative approach. But when reviewing the code the functional approach tends to just leap of the page at you; as to how it works.

But with EK9 you now have a choice in your development approach.

Next Steps

Advanced class methods is covered in the next section. This is not related to Streaming pipeline but is related to Classes.