Nothing Special   »   [go: up one dir, main page]

PL Standard Toolkit Reference

Download as pdf or txt
Download as pdf or txt
You are on page 1of 78

IBM InfoSphere Streams Version 2.0.0.

IBM Streams Processing Language Standard Toolkit Reference

IBM InfoSphere Streams Version 2.0.0.4

IBM Streams Processing Language Standard Toolkit Reference

Note Before using this information and the product it supports, read the general information under Notices on page 63.

Edition Notice This document contains proprietary information of IBM. It is provided under a license agreement and is protected by copyright law. The information contained in this publication does not include any product warranties, and any statements provided in this manual should not be interpreted as such. You can order IBM publications online or through your local IBM representative. v To order publications online, go to the IBM Publications Center at www.ibm.com/e-business/linkweb/ publications/servlet/pbi.wss v To find your local IBM representative, go to the IBM Directory of Worldwide Contacts at www.ibm.com/ planetwide When you send information to IBM, you grant IBM a nonexclusive right to use or distribute the information in any way it believes appropriate without incurring any obligation to you. Copyright IBM Corporation 2011, 2012. US Government Users Restricted Rights Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp.

Summary of changes
This topic describes updates to this documentation for IBM InfoSphere Streams Version 2.0 (all releases). Note: The following revision characters are used in the InfoSphere Streams documentation to indicate updates for Version 2.0.0.4: v In PDF files, updates are indicated by a vertical bar (|) to the left of each new or changed line of text. v In HTML files, updates are surrounded by double angle brackets (>> and <<).

Updates for Version 2.0.0.4 (Version 2.0, Fix Pack 4)


v An optional output port is added to the DeDuplicate operator. For more information, see DeDuplicate on page 51. v The following parameters are added to the DeDuplicate operator: count delta deltaAttribute flushOnPunctuation resetOnDuplicate For more information about these parameters, see DeDuplicate on page 51. v The DeDuplicate operator allows assignments to output attributes. For more information, see DeDuplicate on page 51. v The ignoreExtraCSVValues parameter is added to the FileSource, TCPSource, and UDPSource operators to ignore extra CSV fields on a line. For more information, see FileSource on page 15. v The supported custom output functions of the DirectoryScan operator is updated. For more information, see DirectoryScan on page 23. v The output port of the Import operator is punctuation-preserving. For more information, see Import on page 39. v The CountByDistinct and MinCount functions of the Aggregate operator are corrected. For more information, see Aggregate on page 9. v The parsing parameter of the FileSource, TCPSource, and UDPSource operators is updated. For more information, see FileSource on page 15, TCPSource on page 26, and UDPSource on page 34.

Updates for Version 2.0.0.3 (Version 2.0, Fix Pack 3)


v The following optional parameters are added to the DirectoryScan operator: moveToDirectory ignoreDotFiles ignoreExistingFilesAtStartup sortBy order For more information about these parameters, see DirectoryScan on page 23.

Copyright IBM Corp. 2011, 2012

iii

v Multiple DirectoryScan operators can scan the same directory simultaneously if the processed files are moved to a different directory before generating the output tuple. v The DirectoryScan operator supports custom output functions to provide additional information about the generated file. v The interface parameter is added to the TCPSource, TCPSink, and UDPSource operators to specify the network interface to use when registering the address with the name parameter. v The nConnections metric is added to the TCPSource and TCPSink operators to indicate the number of active TCP/IP connections. v The append parameter is added to the FileSink operator to append the generated tuples to the output file. For more information, see FileSink on page 20. v The ignoreOpenErrors parameter is added to the FileSource operator to read successive files if a file cannot be opened for reading. For more information, see FileSource on page 15. v An optional output port is added to the FileSource operator to indicate the files that were processed and those that could not be opened successfully. v If an SPL program or a toolkit uses the new features that are added to the Standard Toolkit in IBM InfoSphere Streams Version 2.0.0.3 , you must set the Standard Toolkit version to 1.0.1 in the info.xml file. For more information about the info.xml file and how to set dependencies on other toolkits, see how to create toolkits in the IBM Streams Processing Language Toolkit Development Reference.

Updates for Version 2.0.0.2 (Version 2.0, Fix Pack 2)


v The DirectoryScan operator uses change time (ctime) of the file to detect if the file has been recreated. For more information, see DirectoryScan on page 23. v The hasHeaderLine parameter of the FileSource operator supports multiple lines of column names for csv format. For more information, see FileSource on page 15. v A logic clause cannot be specified for the Export operator. v A config clause cannot be specified for the Import and Export operators. v If a file is moved to a directory that is on a different file system, a .rename subdirectory might be created in the target directory for the file move operation to be atomic. For more information, see FileSink on page 20 and FileSource on page 15.

Updates for Version 2.0.0.1 (Version 2.0, Fix Pack 1)


This guide was not updated for Version 2.0.0.1.

iv

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Abstract
This document describes the operators that are provided by the IBM Streams Processing Language (SPL) standard toolkit. This standard toolkit is specific to IBM InfoSphere Streams.

Copyright IBM Corp. 2011, 2012

vi

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Contents
Summary of changes . . . . . . . . . iii Abstract. . . . . . . . . . . . . . . v Chapter 1. Relational Operators . . . . 1
Filter . . Functor . Punctor . Sort . . Join . . Aggregate . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1 1 2 3 5 9

Chapter 3. Utility Operators . . . . . . 43


Custom. . . Beacon . . . Throttle. . . Delay . . . Barrier . . . Pair . . . . Split . . . . DeDuplicate . Union . . . ThreadedSplit DynamicFilter Gate . . . . JavaOp . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 43 43 44 45 46 48 49 51 52 53 54 55 57

Chapter 2. Adapter Operators . . . . . 15


FileSource . . FileSink. . . DirectoryScan. TCPSource. . TCPSink . . UDPSource . UDPSink . . Export . . . Import . . . MetricsSink . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 15 20 23 26 31 34 37 39 39 41

Chapter 4. Compat Operators . . . . . 59


V1TCPSource . . V1TCPSink . . Compat.Sample . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 59 . 61 . 62

Notices . . . . . . . . . . . . . . 63

Copyright IBM Corp. 2011, 2012

vii

viii

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Chapter 1. Relational Operators


Filter
Description The Filter operator removes tuples from a stream by passing along only those that satisfy a user-specied condition. Non-matching tuples may be sent to a second optional output. Input Ports The Filter operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious. Output Ports The Filter operator is configurable with one or two output ports. The rst output port is mandatory, non-mutating, and its punctuation mode is Preserving. The second output port is optional, non-mutating and its punctuation mode is Preserving. The Filter operator requires that the stream type of the output port(s) match the stream type of the input port. The rst output port will receive the tuples that match the filter expression. The second output port, if present, will receive the tuples that fail to match the filter expression. Parameters The Filter operator has the following parameters: filter This is an optional parameter, which specifies the condition that determines the tuples to be passed along by the Filter operator. It takes a single expression of type boolean as its value. When not specified, it is assumed to be true. Windowing The Filter operator does not accept any window configurations. Assignments The Filter operator does not allow assignments to output attributes. The output tuple attributes are automatically forwarded from the input ones.
composite Main { graph stream<rstring name, uint32 age> Beat = Beacon() {} stream<rstring name, uint32 age> Youngs = Filter(Beat) { param filter : age < 30u; } (stream<Beat> Younger; stream<Beat> Older) = Filter(Beat) { param filter : age < 30u; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12

Functor
Description The Functor operator is used to transform input tuples into output ones, and optionally filter them as in a Filter operator. If you do not filter an input tuple, any incoming tuple results in a tuple on each output port. Input Ports The Functor operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious
Copyright IBM Corp. 2011, 2012

Output Ports The Functor operator is configurable with one or more output ports. The output ports are mutating and their punctuation mode is Preserving Parameters The Functor operator has the following parameters: filter This is an optional parameter, which specifies the condition that determines which input tuples are to be operated on by the Functor operator. It takes a single expression of type boolean as its value. When not specified, it is assumed to be true, i.e., tuples are transformed, but no filtering is performed. Windowing The Functor operator does not accept any window configurations. Assignments The Functor operator allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the Functor operator expects all output tuple attributes to be completely assigned.
composite Main { graph stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {} stream<rstring name, uint32 age, rstring login, tuple<boolean young, boolean rich> info> Annotated = Functor(Beat) { param filter : age >= 18u; output Annotated : login = lower(name), info = { young = (age<=30u), rich = (salary>1000000ul) }; } (stream<rstring name, uint32 age> Age; stream<rstring name, uint64 salary> Salary) = Functor(Beat) { param filter : age >= 18u; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17

Punctor
Description The Punctor operator is used to transform input tuples into output ones and add window punctuations to the output. Input Ports The Punctor operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious. Output Ports The Punctor operator is configurable with a single output port. The output port is mutating and its punctuation mode is Generating. Parameters The Punctor operator has the following parameters: punctuate This is a mandatory parameter, which specifies the condition that determines when a window punctuation is to be generated. It takes a single expression of type boolean as its value. position This is a mandatory parameter, which specifies the position of the generated window punctuation with respect to the current tuple.

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

The valid values are before and after. If the value is before, the punctuation will be generated before the output tuple, otherwise it will be generated after the output tuple. Windowing The Punctor operator does not accept any window configurations. Assignments The Punctor operator allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the Punctor operator expects all output tuple attributes to be completely assigned.
composite Main { graph stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {} stream<rstring name, uint32 age, rstring login, tuple<boolean young, boolean rich> info> Annotated = Punctor(Beat) { param punctuate : age >= 18u; position : after; // add a punctuation after the generated tuple, // if the age is >= 18 output Annotated : login = lower(name), info = { young = (age<=30u), rich = (salary>1000000ul) }; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14

Sort
Description The Sort operator is used to order tuples based on user-specified ordering expressions and window configurations. Input Ports The Sort operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is WindowBound. The Sort operator will process window marker punctuations when configured with a punctuation based window. Output Ports The Sort operator is configurable with a single output port. The output port is mutating and its punctuation mode is Generating. The Sort operator will generate a punctuation after each batch of sorted tuples it outputs. The Sort operator requires that the stream type for the output port matches the stream type for the input port. Parameters The Sort operator has the following parameters: sortBy This is a mandatory parameter that specifies one or more expressions to be used for sorting the tuples. The sort is performed in lexicographical manner in ascending order. I.e., the rst expression will be used rst for the comparison and in the case of equality the second expression will be considered, and so on. The default sort order of ascending implies that the output stream will produce tuples in non-decreasing order. The sort order can be changed using the order parameter. order This is an optional parameter that specifies either the global sort order, or the sort order for the individual expressions that appear in the sortBy parameter. The valid values are ascending and descending. When a single value is specified for the order
Chapter 1. Relational Operators

parameter it determines the global sort order. When multiple values are specified, then the number of values must match the number of sortBy expressions. partitionBy This is an optional parameter that is only valid for a Sort operator configured with a partitioned window (see below). It specifies one or more expressions to be used for partitioning the input tuples into sub-windows, where all window and parameter configurations apply to the sub-windows, independently. Windowing The Sort operator supports the following window configurations: tumbling, (count | delta | time | punctuation)-based eviction (, partitioned (, partitionEvictionSpec)? )? sliding, count-based eviction, count-based trigger of 1 (, partitioned (, partitionEvictionSpec)? )? For the tumbling variants, tuples are sorted when the window gets full and are output at once. A window marker punctuation is output at the end. For the sliding variants, tuples are always kept in sorted order. Once the window gets full, every new tuple causes the rst one in the sorted order to be removed from the window and output. This type of sort is referred to as progressive sort. For the partitioned variants, the window specification applies to individual sub-windows identified by the partitionBy parameter. For the tumbling variants, the final punctuation marker does not flush the window (so as not to break invariants on the output), whereas for the sliding variants (progressive), the final punctuation marker does flush the window. Assignments The Sort operator does not allow assignments to output attributes. The output tuple attributes are automatically forwarded from the input ones. Metrics The Sort operator has the following metrics: v nCurrentPartitions: The number of partitions currently in the window for the Sort operator.
composite Main { graph stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {} // count based window stream<Beat> Sorted0 = Sort(Beat) { window Beat : tumbling, count(10); param sortBy : name, (float64)salary/(float64)age; } // count based partitioned window stream<Beat> Sorted1 = Sort(Beat) { window Beat : tumbling, count(10), partitioned; param partitionBy : name; sortBy : (float64)salary/(float64)age; } // count based window, with sort order stream<Beat> Sorted2 = Sort(Beat) //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

{ window Beat : tumbling, count(10); param sortBy : name, (float64)salary/(float64)age; order : descending; } // count based window, with sort order for each sortBy expression stream<Beat> Sorted3 = Sort(Beat) { window Beat : tumbling, count(10); param sortBy : name, (float64)salary/(float64)age; order : ascending, descending; } // punctuation based window stream<Beat> Sorted4 = Sort(Beat) { window Beat : tumbling, punct(); param sortBy : name, (float64)salary/(float64)age; } // time based window stream<Beat> Sorted5 = Sort(Beat) { window Beat : tumbling, time(10); param sortBy : name, (float64)salary/(float64)age; } // delta based window stream<uint32 id, uint32 age, uint64 salary> BeatId = Beacon() {} stream<BeatId> Sorted6 = Sort(BeatId) { window BeatId : tumbling, delta(id, 10u); param sortBy : (float64)salary/(float64)age; } // progressive sort stream<Beat> Sorted = Sort(Beat) { window Beat : sliding, count(10); param sortBy : name, (float64)salary/(float64)age; } }

//23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39 //40 //41 //42 //43 //44 //45 //46 //47 //48 //49 //50 //51 //52 //53 //54 //55 //56 //57 //58 //59 //60 //61 //62 //63 //64 //65 //66 //67 //68 //69 //70 //71 //72

Join
Description The Join operator is used to correlate tuples from two streams based on user-specified match predicates and window configurations. When a tuple is received on an input port, it is inserted into the window corresponding to the input port, which causes the window to trigger. As part of the trigger processing, the tuple is compared against all tuples inside the window of the opposing input port. If the tuples match, then an output tuple will be produced for each match. If at least one output was generated, a window punctuation will be generated after all the outputs. If equalityRHS and equalityLHS parameters are specified, the matching will be done using a hash table. Otherwise a scan of the tuples in the window will be done to find the matches. In an outer join configuration, if a tuple does not get involved in a match during its stay in the join window, then it will be sent out to an output port right before its eviction from the window. See the algorithm parameter for details.
Chapter 1. Relational Operators

Partitioning may be used to split the tuples into partitioned windows. Input Ports The Join operator is configurable with two input ports. The input ports are non-mutating and their punctuation mode is Oblivious. Output Ports The Join operator is configurable with a single output port in the case of an inner join, one or two output ports in the case of a rightOuter or leftOuter join, and one or three output ports in the case of an outer join. The output ports are mutating. The punctuation mode is Generating for the rst output port and Free for any other output ports that may exist. The Join operator will generate a punctuation after each batch of joined tuples it outputs on its rst output port. Parameters The Join operator has the following parameters: match This optional parameter specifies an expression of type boolean to be used for matching the tuples. The expression could refer to attributes from both input ports. When omitted, the default value of true is used.

algorithm This optional parameter is used to specify the join algorithm to be used. The valid options are leftOuter, rightOuter, outer, and inner. In a left outer join, a tuple that is being evicted from the left port's window and has never been involved in a match earlier is paired with a default initialized tuple (whose attributes are default constructed) from the right port and output. If a defaultTupleRHS parameter is specified, its value is used instead of the default constructed tuple. A right outer join is similar, but applies to tuples that are being evicted from the right port's window and employs the defaultTupleLHS parameter if present. An outer join is a combination of left and right outer joins. The default for this parameter is the inner join option, which does not perform any action upon eviction of tuples. For leftOuter and rightOuter joins, an optional second output port can be specified. In this case, the evicted tuples that have no matches are output on the second output port and are not joined with an empty tuple from the opposite window. The schema of the second output port must match that of the left input port in the case of a leftOuter join and the right input port in the case of a rightOuter join. For an outer join, optional second and third output ports can be specified. This means that the outer join can have either one output port or three output ports. When specified, the second port is used to output evicted tuples from the left input port that have no matches and the third port is used to output the ones from the right input port. The schemas of the second and third output ports must match the schemas of the rst and second input ports, respectively. defaultTupleLHS This optional parameter can be specified to indicate the tuple to be used from the left stream, for matching an expiring tuple from the right window that needs to be output as part of a right outer join or outer join algorithm. It is only valid for join operators with a single output port and those that have rightOuter or outer as the

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

join algorithm. It can take a single value of tuple type, which must match the type of the tuples from the left stream. defaultTupleRHS This optional parameter can be specified to indicate the tuple to be used from the right stream, for matching an expiring tuple from the left window that needs to be output as part of a left outer join or outer join algorithm. It is only valid for join operators with a single output port and those that have leftOuter or outer as the join algorithm. It can take a single value of tuple type, which must match the type of the tuples from the right stream. equalityLHS This optional parameter is used to specify equality condition expressions from the left port. The number of expressions and their types must match those from the equalityRHS parameter. The expressions could refer to attributes from the left input port only. equalityRHS This optional parameter is used to specify equality condition expressions from the right port. The number of expressions and their types must match those from the equalityLHS parameter. The expressions could refer to attributes from the right input port only. The equalityLHS and equalityRHS parameters can be used to specify equi-join match predicates, which results in using a hash-based join implementation, rather than a nested-loop one. They are not mutually exclusive with the match parameter and can be used together. partitionByLHS This optional parameter specifies one or more expressions to be used for partitioning the input tuples from the left port into sub-windows, where all window and parameter configurations apply to the sub-windows, independently. It can only be used if a partitioned window is defined for the left port (see below). The expressions could refer to attributes from the left input port only. partitionByRHS This optional parameter specifies one or more expressions to be used for partitioning the input tuples from the right port into sub-windows, where all window and parameter configurations apply to the sub-windows, independently. It can only be used if a partitioned window is defined for the right port (see below). The expressions could refer to attributes from the right input port only. Windowing The Join operator supports the following window configurations for a given input port: sliding, (count | delta | time)-based eviction, count-based trigger of 1 (, partitioned (, partitionEvictionSpec)? )? All window configurations have a count-based trigger of 1. This means that every time a tuple is received on a port, it is inserted into its window, which triggers the join processing. The newly inserted tuple is matched against the tuples resident in the window defined over the other input port. In case of matches, a result is output for each match and a window marker punctuation is output at the end.

Chapter 1. Relational Operators

For the partitioned variants, the window specification applies to individual sub-windows identified by the partitionBy parameter corresponding to the port. The left input port of the join cannot have a partitioned window defined unless a partitionByLHS parameter is specified. Similarly, the right input port of the join cannot have a partitioned window defined unless a partitionByRHS parameter is specified. Assignments The Join operator allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the Join operator expects all output tuple attributes to be completely assigned. Metrics The Join operator has the following metrics: v nCurrentPartitionsLHS: The number of partitions currently in the left hand side window for the Join operator. v nCurrentPartitionsRHS: The number of partitions currently in the left hand side window for the Join operator.
composite Main { //1 graph //2 stream<rstring firstName, rstring lastName, uint64 salary> BeatL = Beacon() {} //3 stream<rstring name, rstring manager, rstring department> BeatR = Beacon() {} //4 // join with a match condition //5 stream<BeatL, BeatR> Join1 = Join(BeatL; BeatR) { //6 window //7 BeatL : sliding, count(100); //8 BeatR : sliding, time(10); //9 param //10 match : BeatR.name == BeatL.firstName + " " + BeatL.lastName && //11 department == "HR"; //12 output //13 Join1 : salary = salary * 2ul; //14 } //15 // equi-join with an additional match condition //16 stream<BeatL, BeatR> Join2 = Join(BeatL; BeatR) { //17 window //18 BeatL : sliding, count(100); //19 BeatR : sliding, time(10); //20 param //21 match : department == "HR"; //22 equalityLHS : BeatL.firstName + " " + BeatL.lastName; //23 equalityRHS : name; //24 output //25 Join2 : salary = salary * 2ul; //26 } //27 // equi-join with multiple equality expressions //28 stream<BeatL, BeatR> Join3 = Join(BeatL; BeatR) { //29 window //30 BeatL : sliding, count(100); //31 BeatR : sliding, time(10); //32 param //33 equalityLHS : BeatL.firstName + " " + BeatL.lastName, "HR"; //34 equalityRHS : name, department; //35 output //36 Join3 : salary = salary * 2ul; //37 } //38 // single-sided partitioned join with a 0 sized window on the right hand side //39 // and a partitioned window of 1 on the left hand side //40 stream<rstring ticker, decimal64 vwap> VWAP = Beacon() {} //41 stream<rstring ticker, decimal64 askprice, decimal64 asksize> Quote = Beacon() {} //42 stream<rstring ticker, decimal64 bargainIndex> //43 Bargain = Join(VWAP; Quote) //44 { //45 window //46 VWAP : sliding, count(1), partitioned; //47 Quote : sliding, count(0); //48 param //49 match : vwap > askprice*100.0d; //50 partitionByLHS : VWAP.ticker; //51 equalityLHS : VWAP.ticker; //52 equalityRHS : Quote.ticker; //53

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

output Bargain : bargainIndex = exp(vwap-askprice*100.0d)*asksize; } // a left outer join with single output stream<rstring message, uint32 kind> MsgLHS = Beacon() {} stream<rstring message, uint32 kind, uint64 tm> MsgRHS = Beacon() {} stream<rstring message1, rstring message2> Msgs1 = Join(MsgLHS as L; MsgRHS as R) { window L : sliding, count(0); R : sliding, delta(tm, 10ul), partitioned; param algorithm : leftOuter; partitionByRHS : R.kind; defaultTupleRHS : { message = "N/A", kind = 0u, tm = 0ul}; equalityLHS : L.message, L.kind; equalityRHS : R.message, R.kind; output Msgs1 : message1 = L.message, message2 = R.message; } // a right outer join with two outputs (stream<rstring message1, rstring message2> Msgs2; stream<rstring message, uint32 kind, uint64 tm> MsgsRHS2) = Join(MsgLHS as L; MsgRHS as R) { window L : sliding, count(0); R : sliding, delta(tm, 10ul), partitioned; param algorithm : rightOuter; partitionByRHS : R.kind; equalityLHS : L.message; equalityRHS : R.message; output Msgs2 : message1 = L.message, message2 = R.message; } // an outer join with three outputs (stream<rstring message1, rstring message2> Msgs3; stream<rstring message, uint32 kind> MsgsLHS3; stream<rstring message, uint32 kind, uint64 tm> MsgsRHS3) = Join(MsgLHS as L; MsgRHS as R) { window L : sliding, count(0); R : sliding, delta(tm, 10ul), partitioned; param algorithm : outer; partitionByRHS : R.kind; equalityLHS : L.message; equalityRHS : R.message; output Msgs3 : message1 = L.message, message2 = R.message; } // an outer join with a single output. //Discard unreferenced partitions after 60 seconds. stream<rstring message1, rstring message2> Msgs4 = Join(MsgLHS as L; MsgRHS as R) { window L : sliding, count(0); R : sliding, delta(tm, 10ul), partitioned, partitionAge(60.0); param algorithm : outer; partitionByRHS : R.kind; equalityLHS : L.message; equalityRHS : R.message; output Msgs4 : message1 = L.message, message2 = R.message; } }

//54 //55 //56 //57 //58 //59 //60 //61 //62 //63 //64 //65 //66 //67 //68 //69 //70 //71 //72 //73 //74 //75 //76 //77 //78 //79 //80 //81 //82 //83 //84 //85 //86 //87 //89 //90 //91 //92 //93 //94 //95 //96 //97 //98 //99 //100 //101 //102 //103 //104 //105 //106 //107 //108 //109 //110 //111 //112 //113 //114 //115 //116 //117 //118 //119 //120 //121 //122 //123 //124 //125

Aggregate
Description The Aggregate operator is used to compute user-specified aggregations over tuples gathered in a window.
Chapter 1. Relational Operators

Input Ports The Aggregate operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is WindowBound. The Aggregate operator will process window marker punctuations when configured with a punctuation based window. Output Ports The Aggregate operator is configurable with a single output port. The output port is mutating and its punctuation mode is Generating. The Aggregate operator will generate a window punctuation after each batch of aggregations it outputs. Parameters The Aggregate operator has the following parameters: groupBy This an optional parameter that specifies one or more expressions to be used for dividing the tuples in a window into groups. When a window res (a sliding window triggers or a tumbling window flushes), one tuple with the user-specified aggregations is computed for each group in the window and these tuples are output as a batch. A window marker punctuation is output after the tuples. partitionBy This is an optional parameter that is only valid for an Aggregate operator configured with a partitioned window (see below). It specifies one or more expressions to be used for partitioning the input tuples into sub-windows, where all window and parameter configurations apply to the sub-windows, independently. aggregateIncompleteWindows This optional parameter of type boolean is valid only for sliding windows. The default value is false. When set to true, aggregations will be done when trigger occurs, even if the window has not filled up. If set to false, triggers before the window is full will be ignored. Windowing The Aggregate operator supports the following window configurations: tumbling, (count | delta | time | punctuation)-based eviction (, partitioned (, partitionEvictionSpec)? )? sliding, (count | delta | time)-based eviction, (count |delta |time)-based trigger (, partitioned (, partitionEvictionSpec)? )? For the tumbling variants, tuples are aggregated when the window gets full (and flushes). The tuples containing the aggregates are output at once, followed by a window marker punctuation. Note that more than one tuple can be output when the groupBy parameter is specified. For the sliding variants, tuples are aggregated when the window triggers. The tuples containing the aggregates are output at once, followed by a window marker punctuation. Note that more than one tuple can be output when the groupBy parameter is specified. The sliding windows for an Aggregate operator do not re until the window is full for the rst time unless aggregateIncompleteWindows is true. This rule does not apply to sliding windows with time-based trigger policies. Such windows are assumed to be full when they start out.

10

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Both for tumbling and sliding windows, when a time-based window with no tuples in it res, just a window marker punctuation is output. When a tumbling, punctuation-based window with no tuples in it receives a window marker punctuation, just a window marker punctuation is output. For the partitioned variants, the window specification and parameters apply to individual sub-windows identified by the partitionBy parameter, as if there were separate Aggregate operators for each partition. The final punctuation marker does not flush any of the pending windows. Assignments The Aggregate operator allows aggregated assignments to output attributes. An aggregated assignment has an aggregation function appearing on the right-hand side of the assignment. The following aggregation functions are supported: v int32 Count(): number of tuples in the group. v int32 CountGroups(): number of groups in a window. v int32 CountAll(): number of tuples in the window. v list<int32> CountByGroup(): list of group sizes (number of tuples in the group) in a window. v <any T> T Any(T v): expression value (v) computed for any tuple in the group (useful for expressions that depend on the groupBy expressions). v <any T> T First(T v): expression value (v) computed for the rst (earliest) tuple in the group. v <any T> T Last(T v): expression value (v) computed for the last (latest) tuple in the group. v <any T> list<T> Collect(T v): collection of expression values (v's) computed for the tuples in the group. v <any T> list<T> CollectDistinct(T v): collection of unique expression values (v's) computed for the tuples in the group. v <any T> int32 CountDistinct(T v): number of distinct expression values (v's) computed for the tuples in the group. v <any T> list<int32> CountByDistinct(T v): collection of cardinalities for the distinct expression values (v's) computed for the tuples in the group, where the cardinality is the number of times the distinct value appears. The order of entries in a CountByDistinct result matches the order of entries in a corresponding CollectDistinct result. v <numeric T> T Average(T v): average of the expression values (v's) computed for the tuples in the group. v <numeric T> list<T> Average(list<T> v): list of per element averages of the expression list values (v's) computed for the tuples in the group. All lists must have the same size. v <numeric T> T Sum(T v): sum of the expression values (v's) computed for the tuples in the group. v <string T> T Sum(T v): same as above, but for strings (concatenation). v <numeric T> list<T> Sum(list<T> v): list of per element sums of the expression list values (v's) computed for the tuples in the group. All lists must have the same size. v <ordered T> T Max(T v): maximum of the expression values (v's) computed for the tuples in the group.

| | | | |

Chapter 1. Relational Operators

11

v <numeric T> list<T> Max(list<T> v): list of per element maximums of the expression list values (v's) computed for the tuples in the group. All lists must have the same size. v <ordered T> T Min(T v): minimum of the expression values (v's) computed for the tuples in the group. Remember: The Min/Max aggregate functions do a column-wise min/max on the lists. For example, Min([1,2,1], [1,1,2]) == [1,1,1] which is column-wise comparison. whereas, InfoSphere Streams Version 1.2 Min/Max aggregate functions return the smallest/largest list. For example, Min([1,2,1], [1,1,2]) == [1,1,2] which is lexicographic comparison. v <numeric T> list<T> Min(list<T> v): list of per element minimums of the expression list values (v's) computed for the tuples in the group. All lists must have the same size. v <ordered T> int32 MaxCount(T v): similar to Max, but returns the number of tuples for which the maximum value occurs, rather than the maximum value itself. v <ordered T> int32 MinCount(T v): similar to Min, but returns the number of tuples for which the minimum value occurs, rather than the minimum value itself. v <ordered T, any K> K ArgMin(T v, K w): the argument expression value (w) corresponding to the minimum of the objective expression values (v's) computed for tuples in the group. v <ordered T, any K> list<K> CollectArgMin(T v, K w): similar to ArgMin, but returns a list in case of more than one argument minimizing the objective. v <ordered T, any K> K ArgMax(T v, K w): the argument expression value (w) corresponding to the maximum of the objective expression values (v's) computed for tuples in the group. v <ordered T, any K> list<K> CollectArgMax(T v, K w): similar to ArgMax, but returns a list in case of more than one argument maximizing the objective. <numeric T> T SampleStdDev(T v): sample standard deviation of the expression values (v's) computed for the tuples in the group. v <numeric T> T PopulationStdDev(T v): population standard deviation of the expression values (v's) computed for the tuples in the group. v Output attributes missing assignments are automatically forwarded from the input ones using the Last aggregate. Metrics The Aggregate operator has the following metrics: v nCurrentPartitions: The number of partitions currently in the window for the Aggregate operator.
composite Main { graph stream<rstring name, uint64 id, rstring country, rstring city, uint32 age, int32 salary> Beat = Beacon() {} // tumbling window with no group by stream<int32 maxSalary, uint32 ageOfMaxSalary> Agg0 = Aggregate(Beat) { //1 //2 //3 //4 //5 //6 //7 //8

| | |

12

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

window Beat : tumbling, time(10.5); output Agg0 : maxSalary = Max(salary), ageOfMaxSalary = ArgMax(salary, age); } // tumbling window with group by stream<rstring country, rstring city, int32 maxSalary> Agg1 = Aggregate(Beat) { window Beat : tumbling, punct(); param groupBy : country, city; output Agg1 : maxSalary = Max(salary); } // tumbling partitioned window with no group by stream<int32 maxSalary, int32 numPeopleWithMaxSalary> Agg2 = Aggregate(Beat) { window Beat : tumbling, delta(id, 10lu), partitioned; param partitionBy : country, city; output Agg2 : maxSalary = Max(salary), numPeopleWithMaxSalary = MaxCount(salary); } // tumbling partitioned window with group by stream<rstring city, int32 maxSalary, list<rstring> peopleWithMaxSalary> Agg3 = Aggregate(Beat) { window Beat : tumbling, count(10), partitioned; param groupBy : city; partitionBy : country; output Agg3 : maxSalary = Max(salary), peopleWithMaxSalary = CollectArgMax(salary, name); } // sliding window with no group by stream<Beat, tuple<int32 maxSalary, uint32 ageOfMaxSalary> > Agg4 = Aggregate(Beat) { window Beat : sliding, time(10.5), count(10); output Agg4 : maxSalary = Max(salary), ageOfMaxSalary = ArgMax(salary, age); } // sliding window with group by stream<Beat, tuple<int32 maxSalary> > Agg5 = Aggregate(Beat) { window Beat : sliding, count(10), count(1); param groupBy : country, city; output Agg5 : maxSalary = Max(salary); } // sliding partitioned window with no group by stream<Beat, tuple<int32 maxSalary, int32 numPeopeWithMaxSalary> > Agg6 = Aggregate(Beat) { window Beat : sliding, delta(id, 10lu), count(10), partitioned; param partitionBy : country, city; output Agg6 : maxSalary = Max(salary), numPeopeWithMaxSalary = MaxCount(salary); } // sliding partitioned window with group by stream<Beat, tuple<int32 maxSalary, list<rstring> peopleWithMaxSalary> > Agg7 = Aggregate(Beat) { window Beat : sliding, count(10), time(1), partitioned; param

//9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39 //40 //41 //42 //43 //44 //45 //46 //47 //48 //49 //50 //51 //52 //53 //54 //55 //56 //57 //58 //59 //60 //61 //62 //63 //64 //65 //66 //67 //68 //69 //70 //71 //72 //73 //74 //75 //76 //77 //78 //79 //80 //81 //82 //83 //84 //85 //86 //87 //88

Chapter 1. Relational Operators

13

groupBy : city; partitionBy : country; output Agg7 : maxSalary = Max(salary), peopleWithMaxSalary = CollectArgMax(salary, name); } }

//89 //90 //91 //92 //93 //94 //95

14

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Chapter 2. Adapter Operators


FileSource
Description The FileSource operator reads data from a le and produces tuples as a result. Input Ports The FileSource operator has one optional input port. If present, the input port schema must be a tuple with a single rstring attribute. Each tuple will hold the le name to be read by the FileSource operator. While processing the tuple, the entire le will be read, and tuples generated by the FileSource operator. Output Ports The FileSource operator is configurable with two output ports. The first output port is mutating and its punctuation mode is Generating. The FileSource operator will output a window marker punctuation when the le is read in full. The second output port is optional and must contain a tuple with two attributes: one with an attribute of type rstring and one with an attribute of type int32. This stream generates tuples with the file name and 0 as the attribute values when the end of the file being read is reached. If a file fails to open, the stream generates tuples with the file name and the system error code. This allows a downstream operator to know which files were processed, and which files could not be opened successfully. Parameters The FileSource operator has the following parameters: file This is an optional parameter that specifies the name of the source le. It must not be present if the FileSource operator has an input port, otherwise it must be present. It is of type rstring. It is valid for the file parameter to refer to a named pipe, unless the hotFile parameter is set to true. hotFile is implemented using seek, and seek is not valid on a named pipe.

format This optional parameter specifies the format of the le. Valid values are txt, csv, bin, line, and block. The default format is csv. This parameter can only take a single value. The detailed descriptions of individual format options are as follows: v txt: This format expects the le to be structured as a series of lines, where each line is a tuple literal, free of any type suffixes. String literals must be in double quotes. The # character can be used to mark comment lines. An example is as follows: # tuple<rstring name, uint32 age> {name="John", age=40} {name="Mary", age=35} v csv: This format expects the le to be structured as a series of lines, where each line is a list of comma separated values. String literals that are used at the outermost level can appear without the double quotes, unless they have a ,' character or escaped
Copyright IBM Corp. 2011, 2012

15

characters, in which case double quotes are required. Both rstring and ustring values should appear as utf-8 encoded strings. For fields missing in the csv formatted line (as in , ,), default constructed values will be used, unless the defaultTuple parameter is specified. The separator parameter may be used to change the default separator of ,'. '.' is used as the decimal point for binary and decimal floating point data. The # character can be used to mark comment lines. An example is as follows # tuple<rstring name, uint32 age, list<tuple<rstring city, rstring state> > locations> John, 40, [{city="New York City",state="NY"}, {city="Atlanta",state="GA"}] "Mary, and co.", 35, [{city="Toronto",state="ON"}, {city="White Plains",state="NY"}] v bin: This format expects the le to be structured as a series of tuples in binary, using network byte order. Tuple attributes are assumed to be serialized in sequence to form a tuple. v line: This format expects the le to be structured as a series of lines. It also expects the output stream schema to contain a single attribute of type rstring. Each line will be converted into a tuple, where the line text (excluding the end of line marker) becomes the rstring attribute in the output tuple. The end of line marker can be customized via the use of the eolMarker parameter. v block: This format expects the le to be structured as a series of binary blocks. It also expects the output stream schema to contain a single attribute of type blob. Each block will be converted into a tuple. The block size can be customized via the use of the blockSize parameter. The last block read from the le may be less than blockSize bytes. hasHeaderLine This optional attribute-free parameter of type boolean or uint32 is valid only if the format is csv. If true, then the first line in the file will be read and ignored. If false (the default), no lines will be skipped. If a uint32 expression is passed, that number of lines will be skipped. This allows column names to be present in the first several lines of the file. ignoreOpenErrors This optional parameter of type boolean specifies if the FileSource operator will continue executing if the input file cannot be opened. If the ignoreOpenErrors parameter is set to true and an input file cannot be opened, the FileSource operator logs an error and proceeds with the next input file. If not present, or the ignoreOpenErrors parameter is false, the FileSource operator will log an error and terminate. By default, the ignoreOpenErrors parameter is set to false. hasDelayField This optional parameter of type boolean is used to instruct the FileSource operator to expect an additional attribute which specifies a delay to be used to pace the generation of the output tuples. By default, it is false. This parameter can only be used with txt, csv, and bin formats. The type of the delay attribute must be float64 and it is assumed to be in seconds. The delay

16

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

attribute must appear before the tuple. In the case of txt and csv formats the delay attribute is separated from the tuple via a single comma with optional spaces before and after it. For example, for txt format: # tuple<rstring name, uint32 age> 1.50, {name="John", age=40} 1.75, {name="Mary", age=35} And for csv format: # tuple<rstring name, uint32 age> 1.50, John, 40 1.75, Mary, 35 defaultTuple This optional parameter can be specified to indicate the attribute values to be used in case of missing values in the source data. It is only valid for the csv format. It can take a single value of tuple type. This type must match the type of the output port tuples. parsing | | | | | | | | | | | | | | This optional parameter can be specified to customize the parsing behavior of the FileSource operator. There are three valid values, namely: strict, permissive, and fast. When strict is specified, incorrectly formatted tuples will result in a runtime error and termination of the operator. When permissive is specified, incorrectly formatted tuples will result in a runtime log entry to be created, and the parser will make an effort to skip to the next tuple (formats txt and csv) and continue. If format is bin, the parser will close the current file, and start reading the next file (if FileSource has an input stream). permissive can only be used with txt, csv, and bin formats. When fast is specified, the input file is assumed to be formatted correctly, and no runtime checks will be performed. Incorrect input in fast mode causes undefined behaviour. The default parsing mode is strict. compression This optional parameter is used to specify that the source file is compressed. There are three valid values, representing available compression algorithms. These are: zlib, gzip, and bzip2. encoding This optional rstring parameter can be used to specify the character set encoding used in the input file. The contents of the file will be converted to the UTF-8 character set from the given character set after any decompression and before extraction of the tuples is performed. An example of a valid character set encoding is ISO_8859-9. A list of available encodings can be retrieved using the iconv --list command. encoding is not valid with formats bin or block. hotFile This optional parameter of type boolean is used to specify if the input file is hot. As opposed to regular files, hotfiles are not closed when the end of the file is reached for the first time. Instead the file is continuously checked for more data. If the file size shrinks during these checks, the file offset is reset to the beginning of the
Chapter 2. Adapter Operators

17

file. The default value for the hotFile parameter is false. When set to true, a final marker is not sent upon reaching the end of the file, as hot files ignore that event. Instead a final marker will be sent upon shutdown, after a window marker punctuation is sent. Additionally, if the file offset is ever reset, a window marker punctuation is sent. The hotFile parameter may not be specified if the FileSource operator has an input port, or if deleteFile or moveFileToDirectory are specified. deleteFile This optional parameter of boolean is used to specify that the file should be removed after processing of a file is finished. The deleteFile parameter cannot be specified if hotFile or moveFileToDirectory is specified. moveFileToDirectory This parameter of type rstring is used to specify that the file should be moved to the directory after processing of a file is finished. Any file in the moveFileToDirectory directory of the same name will be removed before the move is done. The moveFileToDirectory cannot be specified if hotFile or deleteFile is specified. A .rename subdirectory may be created in the target directory if the target directory is on a different filesystem. This is used to ensure that the files appear atomically at the target directory. eolMarker This optional parameter is used to specify the end of line marker. It is of type rstring. It can only be used when the lineformat is specified. It defaults to "\n". Valid values include strings with one or two characters, such as"\r" and "\r\n". initDelay This optional float64 parameter is used to specify the number of seconds that the FileSource operator is to delay before starting to produce tuples. If the FileSource operator has an input stream, the delay will happen on receipt of the first tuple. During the delay, the operator is blocked, and any more input tuples will block as well. blockSize This parameter is used to specify the block size. It is of type uint32. It is mandatory when the block format is specified and cannot appear otherwise. separator This optional rstring parameter is used to specify an alternate separator character for csv format. It must be a single character string constant. separator may only be specified if the format is csv. | | | | | | | ignoreExtraCSVValues This optional parameter of type boolean is only relevant with format : csv. If true, extra data on the current input line after the last attribute read will be skipped. If not present, or if ignoreExtraCSVValues has value false, extra data on a line in csv format will cause an error to be logged (parsing : permissive) or an exception raised (parsing : strict).

18

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Windowing The FileSource operator does not accept any window configurations. Assignments The FileSource operator does not allow assignments to output attributes. Metrics The FileSource operator has the following metrics: v nFilesOpened: The number of les opened by FileSource. Only interesting if the FileSource operator has an input port. v nInvalidTuples: The number of tuples that failed to read correctly in csv or txt format. Exceptions The FileSource operator will throw an exception and terminate in the following cases: v The file input file cannot be opened for reading. v The moveFileToDirectory directory does not exist. v The moveFileToDirectory is not a directory.
composite Main { graph // source operator with a relative file argument stream<rstring name, uint32 age, uint64 salary> Beat = FileSource() { param file : "People.dat"; // looks for <application-dir>/data/People.dat } // source operator with a default tuple for missing arguments stream<rstring name, uint32 age, uint64 salary> Beat1 = FileSource() { param file : "People.dat"; defaultTuple : {name="foo", age=19u, salary=10000ul}; } // source operator with an absolute file argument and hot file option stream<Beat> Beat2 = FileSource() { param file : "/tmp/People.dat"; hotFile : true; } // source operator with a csv format specifier, // hasDelayField option, and custom seperator stream<Beat> Beat3 = FileSource() { param file : "People.dat"; format : csv; separator : "|"; hasDelayField : true; } // source operator with a txt format specifier and compression stream<Beat> Beat4 = FileSource() { param file : "People.dat"; format : txt; compression : zlib; } // source operator with a csv format specifier and with strict parsing, waiting // 5 seonds before starting to process the file stream<Beat> Beat5 = FileSource() { param file : "People.dat"; format : csv; parsing : strict; initDelay : 5.0; } // source operator with a bin format specifier stream<Beat> Beat6 = FileSource() { //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39 //40 //41 //42 //43 //44 //45 //46 //47 //48 //49 //50 //51 //52 //53

Chapter 2. Adapter Operators

19

param file : "People.dat"; format : bin; } // source operator with a line format specifier stream<rstring line> Beat7 = FileSource() { param file : "People.dat"; format : line; } // source operator with a line format specifier, and an eolMarker option stream<rstring line> Beat8 = FileSource() { param file : "People.dat"; format : line; eolMarker : "\r"; } // source operator with a block format specifier stream<blob block> Beat9 = FileSource() { param file : "People.dat"; format : block; blockSize : 1024u; } stream<rstring filename> Files = DirectoryScan() { param directory: "foo"; } // source operator reading tuples of 2 int32s from files in directory foo // Delete the files after processing is done stream<int32 i, int32 j> Beat10 = FileSource(Files) { param deleteFile : true; } } The following example uses the second output stream, and shows how to get the string form of the reason for failure: composite Main() { graph stream <rstring f> A = Beacon () { logic state : mutable int32 i = 0; param iterations : 4; output A : f = "file." + (rstring)i++; } (stream<int32 a> B; stream<rstring f, int32 e> C) = FileSource (A) { param ignoreOpenErrors: true; } stream<rstring f, int32 e, rstring reason> D = Functor (C) { output D : reason = strerror (e); } () as Nil = FileSink (D) { param file : "out"; } }

//54 //55 //56 //57 //58 //59 //60 //61 //62 //63 //64 //65 //66 //67 //68 //69 //70 //71 //72 //73 //74 //75 //76 //77 //78 //79 //80 //81 //82 //83 //84 //85 //86 //87 //88 //89 //90 //91

//1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20

FileSink
Description The FileSink operator writes tuples to a le. Input Ports The FileSink operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious. Output Ports The FileSink operator is configurable with an optional output stream of

20

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

type stream<rstring>, which will have the file name that was just closed. If the file is moved, the destination filename will be generated as the output stream. Parameters The FileSink operator has the following parameters: file This is a mandatory parameter that specifies the name of the output file. See the corresponding parameter in the FileSource operator for details. Only the last component of the pathname will be created if it does not already exist. All directories in the pathname up to the last component must already exist. For example, in file : "/a/b/c", /a and /a/b must already exist and be directories. The file is created as an empty file, discarding any previous contents. The user id and the umask of the instance owner will be used. The tuples written to the file will be flushed to disk according to the flush and flushOnPunctuation parameters.

append This optional boolean parameter is used to specify that the generated tuples will be appended to the output file. If false, or not specified, the output file will be truncated before the tuples are generated. format See the corresponding parameter in the FileSource on page 15 operator for details. hasDelayField This optional parameter of type boolean is used to output an additional attribute per tuple, which specifies the inter-arrival delays between the input tuples. See the corresponding parameter in the FileSource on page 15 operator for details. compression See the corresponding parameter in the FileSource on page 15 operator. encoding This optional rstring parameter can be used to specify the character set encoding used in the output le. Data written to the output le will be converted from the UTF-8 character set to the given character set before any compression is performed. encoding is not valid with formats bin or block. eolMarker See the corresponding parameter in the FileSource on page 15 operator. flush This optional parameter of type uint32 is used to flush the output le after given number of tuples. By default no flushing on tuple numbers is performed. Note: If an application expects low volumes of data, use the flush parameter to ensure that the output file is written to disk. flushOnPunctuation This optional parameter of type boolean is used to flush the output file when punctuation is received. flushOnPunctuation defaults to true. writePunctuations This optional parameter of type boolean is used to write
Chapter 2. Adapter Operators

21

punctuations to the output le. It is false by default. writePunctuations can only be used with txt and csv formats. separator See the corresponding parameter in the FileSource on page 15 operator. quoteStrings This optional parameter of type boolean is used to control the quoting of top-level rstrings. It is true by default. If true, rstrings in the tuple will be generated with a leading and trailing double quote ("), and control characters will be escaped. If false, rstrings in the tuple will be written as is. quoteStrings can only be used with the csv format. closeMode This is an optional parameter of type enum {punct, count, size, time, never}. The default value is never. For any other value, when the specified condition is satisfied, the current output file is closed and a new file is opened for writing. In such cases, the file parameter must contain one or more {id} fields to indicate the parts that will be updated with the file id. For example, in the file name "myfile{id}.dat", each {id} will be replaced by 0 for the first file, 1 for the next file that is opened and so on. tuplesPerFile This parameter is used to specify the maximum number of tuples that can be received for each output file. When the specified number of tuples are received, the current output file is closed and a new file is opened for writing. This parameter is of type uint64 or uint32 and must be specified if closeMode parameter is set to count. timePerFile This parameter of type float64 is used to specify the approximate time, in seconds, after which the current output file is closed and a new file is opened. This parameter must be specified if the closeMode parameter is set to time. bytesPerFile This parameter is used to specify the approximate size of the output file, in bytes. When the file size exceeds the specified number of bytes, the current output file is closed and a new file is opened. This parameter is of type uint64 or uint32 and must be specified when the closeMode parameter is set to size. moveFileToDirectory This optional parameter of type rstring is used to specify that the file should be moved to the named directory after the file is closed. Any existing file with same name is removed before moving the file to the moveFileToDirectory directory. A .rename subdirectory may be created in the target directory if the target directory is on a different filesystem. This is used to ensure that the files appear atomically at the target directory. Windowing The FileSink operator does not accept any window configurations. Assignments The FileSink operator does not allow assignments to output attributes.

22

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Exceptions The FileSink operator will throw an exception and terminate the operator in the following case: v The file output le cannot be opened for writing.
composite Main { graph stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {} // sink operator with the hasDelayField option, and fields separated by ": // rstrings will not be printed with double quotes () as Sink1 = FileSink(Beat) { param file : "/tmp/People.dat"; format : csv; separator : ":"; hasDelayField : true; quoteStrings: false; } // sink operator with a txt format specifier and compression () as Sink2 = FileSink(Beat) { param file : "People.dat"; format : txt; compression : zlib; } // sink operator with a bin format specifier and flush option () as Sink3 = FileSink(Beat) { param file : "People.dat"; format : bin; flush : 1u; } // sink operator with a writePunctuations option and no flushing on punctuation () as Sink4 = FileSink(Beat) { param file : "People.dat"; writePunctuations : true; flushOnPunctuation: false; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39

DirectoryScan
Description The DirectoryScan operator watches a directory, and generates le names on the output, one for each le that is found in the directory. The absolute pathname of the le is generated. The le name will only be generated the rst time the file is seen during a directory scan until it is recreated. The change time (ctime) is used to detect if a le has been recreated. Output clause and custom output functions can be used to specify additional information about a file. All non-regular les found in the directory are ignored during the scan. Note: Because the change time of the file is used to detect if a file has been recreated, it is possible that very large files are still being written when a directory is being scanned. In this case, the same file name may be generated multiple times, if the time between scans is less than the time to write the file. In order to avoid this, the file should be written into a different directory on the same filesystem as the directory being scanned, and then renamed to the target directory when complete (/bin/mv will do this if the files are on the same filesystem). If a regular expression pattern is being used to match only certain files, creating the new files under a name that fails to match the pattern, and then renaming, will also work.

Chapter 2. Adapter Operators

23

Before submitting the file name to the output stream, the DirectoryScan operator can optionally move processed files to a different directory using the moveToDirectory parameter. If the moveToDirectory parameter is specified, the file (or symbolic link) is moved to the moveToDirectory directory before the output tuple is generated. When moveToDirectory is specified, it is valid to have multiple DirectoryScan operators reading the same directory. The DirectoryScan operator ensures that each file is submitted by only one operator by creating a temporary .rename subdirectory in the directory and moveToDirectory directories. Input Ports The DirectoryScan operator does not have any input ports. Output Ports The DirectoryScan operator is configurable with a single output port. The output port is non-mutating and its punctuation mode is Free. The output schema for DirectoryScan operator is a tuple. The generated tuple is populated using the output clause. If there is no output clause, or an attribute in the tuple is not assigned using an output clause, then the attribute must be of type rstring. Parameters The DirectoryScan operator has the following parameters: directory This is a mandatory parameter that specifies the name of the directory to be scanned. It is of type rstring. moveToDirectory This optional parameter of type rstring specifies the name of the directory to which files should be moved before the output tuple is generated. pattern This optional parameter of type rstring is used to instruct the DirectoryScan operator to ignore le names that do not match the regular expression pattern. sortBy This optional parameter determines the order in which file names are generated during a single scan of the directory when there are multiple valid files at the same time. The valid values are date and name. If the sortBy parameter is not specified, the default sort order is set to date. order This optional parameter controls how the sortBy parameter sorts the files. The valid values are ascending and descending. If the order parameter is not specified, the default value is set to ascending. If sortBy is set to date, the file with the oldest change time (ctime) is generated first for ascending order. If sortBy is set to name, the file name that is lexically smallest is generated first for ascending order. sleepTime This optional parameter of type float64 instructs the DirectoryScan operator of the minimal time between scans of the directory, in seconds. If not specified, the default is 5.0 seconds. If the time difference between the start of the last scan and the

24

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

current time is less than sleepTime seconds, the DirectoryScan operator will sleep until the time since the last scan is sleepTime seconds. If more than sleepTime seconds have already passed, the next scan will begin immediately. initDelay This optional float64 parameter is used to specify the number of seconds that the DirectoryScan operator is to delay before starting to produce tuples. ignoreDotFiles This optional boolean parameter determines if the DirectoryScan operator ignores files with a leading period (.) in the directory. By default, the value is set to false and files with a leading period are processed. ignoreExistingFilesAtStartup This optional boolean parameter determines if the DirectoryScan operator ignores pre-existing files in the directory. By default, the value is set to false and all files are processed as usual. If set to true, any files present in the directory are marked as already processed, and not submitted. If initDelay is specified, this check is done before the DirectoryScan operator delays. Assignments The DirectoryScan operator supports the following custom output functions: v rstring FilePath(): The pathname to the file in the directory, relative to the input directory parameter. v rstring FileName(): The last component of the pathname. v rstring FullPath(): The absolute pathname to the file in the directory. v rstring DestinationFullPath(): The absolute pathname to the file in the destination directory. v rstring Directory(): The value of the directory parameter. v rstring DestinationDirectory(): The value of the moveToDirectory parameter, or the directory parameter if moveToDirectory is not specified v rstring DestinationFilePath(): The pathname to the file in the destination directory. v uint64 Size(): The size of the file in bytes. v uint64 Atime(): The access time (atime) of the file in seconds since the epoch. v uint64 Ctime(): The change time (ctime) of the file in seconds since the epoch. v uint64 Mtime(): The modification time (mtime) of the file in seconds since the epoch. Note: The atime, ctime, and mtime fields are set from the original file in the source directory. Metrics The DirectoryScan operator has the following metrics: v nScans: The number of times the DirectoryScan operator has read the directory.

| | | | | |

Chapter 2. Adapter Operators

25

Exceptions The DirectoryScan operator will throw an exception and terminate in the following cases: v v v v The directory or moveToDirectory does not exist. The directory or moveToDirectory is not a directory. The pattern is not a valid regular expression. The .rename directories cannot be created when moveToDirectory is specified.
//1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37

composite Main { graph // DirectoryScan operator with a relative directory argument stream<rstring name> Dir1 = DirectoryScan() { param directory : "People.dir"; initDelay: 10.0; } // DirectoryScan operator with an absolute file argument and a file name pattern stream<rstring name> Dir2 = DirectoryScan() { param directory : "/tmp/work"; pattern : "^work.*"; } // use a FileSource operator to process the file names stream<rstring line> Beat6 = FileSource(Dir2) { param // note: param file is not specified format : line; deleteFile : true; // delete the file when processing is finished } // Use DirectoryScan operator to move files to a different directory. // Move the scanned files to the /tmp/active directory. Generate a tuple containing // the original filename in /tmp/work (sourceFile), and the moved filename // in /tmp/active (movedFile). // Generate the size of the file (fileSize). stream<rstring sourceFile, rstring movedFile, uint64 fileSize> Dir3 = DirectoryScan() { param directory : "/tmp/work"; moveToDirectory : "/tmp/active"; output Dir3 : sourceFile = FilePath(), movedFile = DestinationFilePath(), fileSize = Size(); } }

TCPSource
Description The TCPSource operator reads data from a TCP socket and creates tuples out of it. It can be configured as a TCP server (listens for a client connection) or as a TCP client (initiates a connection to a server). In both modes it handles a single connection at a time. It works with both IPv4 and IPv6 addresses. Input Ports The TCPSource operator does not have any input ports. Output Ports The TCPSource operator is configurable with a single output port. The output port is mutating and its punctuation mode is Generating. The TCPSource operator will output a window marker punctuation when a TCP connection terminates. Parameters The TCPSource operator has the following parameters: role This mandatory parameter specifies whether the TCPSource

26

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

operator is server-based or client-based. It takes one of the following two values: server and client. address In the case of a client-based TCPSource operator, this parameter specifies the destination server address of the TCP connection. The address parameter must be specified when the role parameter is set to client and the name parameter is not specified. In all other cases, it cannot be specified. It takes a single value of type rstring. This value could be a host name or an IP address. address may not be used for a server-based TCPSource operator, as the address used is always on the current host. port In the case of a server-based TCPSource operator, this parameter specifies the port address on which the connections will be accepted. In the case of a client-based TCPSource operator, it specifies the destination server port address. It takes a single value of type rstring or type uint32. This could be a well known port alias, such as http'' or ftp''1, as well as a plain port number, such as 45134u. It is an optional parameter for server-based TCPSource operators and when omitted its default value is 0, which picks any available port. For client-based TCPSource operators, the port parameter must be specified when the name parameter is not specified, and it cannot be specified otherwise. In the case of a server-based TCPSource operator, this parameter specifies the name to be used to register the address and port pair for the server with the name service that is part of the Streams runtime. This name can be used by a corresponding client-based TCPSink operator to connect to this operator by just specifying the name. These names are automatically prefixed by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set through the use of config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators will keep trying periodically to register the name, with an error message for each failure. In the case of a client-based TCPSource, this parameter specifies the name to be used to lookup the address and port pair for the destination server from the name service that is part of the Streams runtime. It is an optional parameter that takes a single value of type rstring. streamtool getnsentry command can be used to query server-based TCPSource addresses. The Value field will contain host:port. When the name parameter is specified in the client-mode, then the port and address parameters cannot be specified. This optional parameter can be specified to customize the parsing behavior of the TCPSource operator. There are three valid values, namely: strict, permissive, and fast. When strict is specified, incorrectly formatted tuples will result in a runtime error and termination of the operator. When permissive is specified, incorrectly formatted tuples will result in a runtime log entry to be

name

parsing | | | | | |

1. As specied under /etc/services Chapter 2. Adapter Operators

27

| | | | | | | |

created, and the parser will make an effort to skip to the next tuple (formats txt and csv) and continue. If format is bin, the parser will close the current connection, and start reading the next connection (if the reconnectionPolicy permits). permissive can only be used with txt, csv, and bin formats. When fast is specified, the input file is assumed to be formatted correctly, and no runtime checks will be performed. Incorrect input in fast mode causes undefined behavior. The default parsing mode is strict. interface This optional rstring parameter specifies the network interface to use to register when the name parameter is specified. interface is only valid when role is server and when name is specified. Using interface with name will ensure that a matching operator with a role of client and the same name parameter will use the desired interface. receiveBufferSize This is an optional parameter that is used to override the default kernel receive buffer size. It is of type uint32. reconnectionPolicy This is an optional parameter that specifies the reconnection policy. In the case of a server-based TCPSource operator, this parameter specifies if additional connections are allowed once the initial connection terminates. In the case of a client-based TCPSource operator, this parameter specifies if additional connection attempts will be made once the initial connection to the server terminates. The valid values are: NoRetry, InfiniteRetry, and BoundedRetry. If not specified, it is set to InfiniteRetry. When set to NoRetry, the TCPSource operator produces a final marker punctuation right away, after the initial connection is terminated and a window marker punctuation is sent. reconnectionBound This parameter specifies the number of successive connections that will be attempted for a client-based TCPSource operator or accepted for a server-based TCPSource operator. It is an optional parameter of type uint32. It must appear when the reconnectionPolicy parameter is set to BoundedRetry and cannot appear otherwise. format See the corresponding parameter in the FileSource on page 15 operator for details. defaultTuple See the corresponding parameter in the FileSource on page 15 operator for details. hasDelayField See the corresponding parameter in the FileSource on page 15 operator for details. compression See the corresponding parameter in the FileSource on page 15 operator for details. encoding See the corresponding parameter in the FileSource on page 15 operator for details.

28

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

eolMarker See the corresponding parameter in the FileSource on page 15 operator for details. blockSize See the corresponding parameter in the FileSource on page 15 operator for details. initDelay See the corresponding parameter in the FileSource on page 15 operator for details. separator See the corresponding parameter in the FileSource on page 15 operator for details. | | | ignoreExtraCSVValues See the corresponding parameter in the FileSource on page 15 operator for details. Assignments The TCPSource operator does not allow assignments to output attributes. Metrics The TCPSource operator has the following metrics: v nReconnections: The number of times the TCPSource operator lost connection and reconnected to the other end of the TCP socket. v nInvalidTuples: The number of tuples that failed to read correctly in csv or txt format. v nConnections: The number of currently active TCP/IP connections. The value is 0 if the TCPSource operator is waiting for a connection or a reconnection, or 1 if the operator is currently connected. Exceptions The TCPSource operator will throw an exception and terminate the operator in the following cases: v The host cannot be resolved. v The name cannot be located. v Unable to set SO_REUSEADDR on TCP socket. v Unable to bind to port.
composite Main { graph // server source with an alias string as port stream<rstring name, uint32 age, uint64 salary> Beat = TCPSource() { param role : server; port : "ftp"; } // server source with a number string as port stream<Beat> Beat1 = TCPSource() { param role : server; port : 23145u; } // server source with a name, registering interface eth1 stream<Beat> Beat2 = TCPSource() { param role : server; name : "my_server"; interface : "eth1"; } // server source with a name and port //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25

Chapter 2. Adapter Operators

29

stream<Beat> Beat3 = TCPSource() { param role : server; port : 23145u; name : "my_server"; } // server source with a port and infinite reconnection stream<Beat> Beat4 = TCPSource() { param role : server; port : "ftp"; reconnectionPolicy : InfiniteRetry; } // server source with a port and reconnection (5 times) stream<Beat> Beat4r = TCPSource() { param role : server; port : "ftp"; reconnectionPolicy : BoundedRetry; reconnectionBound : 5u; } // client source with an IP address and port stream<Beat> Beat5 = TCPSource() { param role : client; address : "99.2.45.67"; port : "ftp"; } // client source with an host name as the address stream<Beat> Beat6 = TCPSource() { param role : client; address : "mynode.mydomain"; port : 23145u; } // client source with name stream<Beat> Beat7 = TCPSource() { param role : client; name : "my_server"; } // client source with reconnection stream<Beat> Beat8 = TCPSource() { param role : client; address : "mynode.mydomain"; port : "ftp"; reconnectionPolicy : InfiniteRetry; } // client source with reconnection interval (and 10 connections) // Wait 5 seconds before starting stream<Beat> Beat9= TCPSource() { param role : client; address : "mynode.mydomain"; port : "ftp"; reconnectionPolicy : BoundedRetry; reconnectionBound : 10u; initDelay : 5.0; } }

//26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39 //40 //41 //42 //43 //44 //45 //46 //47 //48 //49 //50 //51 //52 //53 //54 //55 //56 //57 //58 //59 //60 //61 //62 //63 //64 //65 //66 //67 //68 //69 //70 //71 //72 //73 //74 //75 //76 //77 //78 //79 //80 //81 //82 //83 //84 //85 //86 //87 //88 //89 //90 //91 //92 //93 //94

30

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

TCPSink
Description The TCPSink operator writes data to a TCP socket in the form of tuples. It can be configured as a TCP server (listens for a client connection) or as a TCP client (initiates a connection to a server). In both modes it handles a single connection at a time. Input Ports The TCPSink operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious. Output Ports The TCPSink operator does not have any output ports. Parameters The TCPSink operator has the following parameters: role address See the corresponding parameter in the TCPSource on page 26 operator. port name See the corresponding parameter in the TCPSource on page 26 operator. In the case of a server-based TCPSink operator, this parameter specifies the name to be used to register the address and port pair for the server with the name service that is part of the Streams runtime. This name can be used by a corresponding client-based TCPSource operator to connect to this operator by just specifying the name, without the need for an address or port number. These names are automatically prefixed by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set through the use of config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators will keep trying periodically to register the name, with an error message for each failure. In the case of a client-based TCPSink, this parameter specifies the name to be used to lookup the address and port pair for the destination server from the name service that is part of the Streams runtime. It is an optional parameter that takes a single value of type rstring. When the name parameter is specified in the client-mode, then the port and address parameters cannot be specified. See the corresponding parameter in the TCPSource on page 26 operator.

interface This optional rstring parameter specifies the network interface to use to register when the name parameter is specified. interface is only valid when role is server and when name is specified. Using interface with name will ensure that a matching operator with a role of client and the same name parameter will use the desired interface.

Chapter 2. Adapter Operators

31

sendBufferSize This is an optional parameter that is used to override the default kernel send buffer size. It is of type uint32. reconnectionPolicy See the corresponding parameter in the TCPSource on page 26 operator. reconnectionBound See the corresponding parameter in the TCPSource on page 26 operator. format See the corresponding parameter in the FileSink on page 20 operator. hasDelayField See the corresponding parameter in the FileSink on page 20 operator. compression See the corresponding parameter in the FileSink on page 20 operator. encoding See the corresponding parameter in the FileSink on page 20 operator. eolMarker See the corresponding parameter in the FileSink on page 20 operator. flush See the corresponding parameter in the FileSink on page 20 operator.

flushOnPunctuation See the corresponding parameter in the FileSink on page 20 operator. writePunctuations See the corresponding parameter in the FileSink on page 20 operator. separator See the corresponding parameter in the FileSource on page 15 operator. quoteStrings See the corresponding parameter in the FileSink on page 20 operator. retryFailedSends Retry failed sends until a connection is established and the send succeeds. Windowing The TCPSink operator does not accept any window configurations. Assignments The TCPSink operator does not allow assignments to output attributes. Metrics The TCPSink operator has the following metrics: v nReconnections: The number of times the TCPSink operator lost the connection and reconnected to the other end of the TCP socket.

32

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

v nConnections: The number of currently active TCP/IP connections. The value is 0 if the TCPSink operator is waiting for a connection or a reconnection, or 1 if the operator is currently connected. Exceptions The TCPSink operator will throw an exception and terminate the operator in the following cases: v The host can not be resolved. v The name can not be located.
composite Main { graph stream<rstring name, uint32 age, uint64 salary> Beat = Beacon() {} // server sink with an alias string as port () as Beat1 = TCPSink(Beat) { param role : server; port : "ftp"; } // server sink with a number string as port () as Beat2 = TCPSink(Beat) { param role : server; port : 23145u; } // server sink with a name () as Beat3 = TCPSink(Beat) { param role : server; name : "my_server"; } // server sink with a name and port () as Beat4 = TCPSink(Beat) { param role : server; port : 23145u; name : "my_server"; } // server sink with a port and infinite reconnection () as Beat5 = TCPSink(Beat) { param role : server; port : "ftp"; reconnectionPolicy : InfiniteRetry; } // client sink with an IP address and port () as Beat6 = TCPSink(Beat) { param role : client; address : "99.2.45.67"; port : "ftp"; } // client sink with an host name as the address () as Beat7 = TCPSink(Beat) { param role : client; address : "mynode.mydomain"; port : 23145u; } // client sink with name () as Beat8 = TCPSink(Beat) { param role : client; name : "my_server"; } // client sink with reconnection (25 connections) () as Beat9 = TCPSink(Beat) { param //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39 //40 //41 //42 //43 //44 //45 //46 //47 //48 //49 //50 //51 //52 //53 //54 //55 //56 //57 //58 //59 //60 //61 //62 //63 //64 //65 //66 //67

Chapter 2. Adapter Operators

33

role address port reconnectionPolicy reconnectionBound } }

: : : : :

client; "mynode.mydomain"; "ftp"; BoundedRetry; 25u;

//68 //69 //70 //71 //72 //73 //74

UDPSource
Description The UDPSource operator reads data from a UDP socket and creates tuples out of it. Each tuple must t into a single UDP packet and a single UDP packet must contain only a single tuple. Input Ports The UDPSource operator does not have any input ports. Output Ports The UDPSource operator is configurable with a single output port. The output port is mutating and its punctuation mode is Free. Parameters The UDPSource operator has the following parameters: address This parameter specifies the address of the sender whose UDP packets will be accepted. If the address is a multicast address, then all UDP packets destined to that multicast address at the specified port (see below) are accepted regardless of the sender's address. It is an optional parameter. When not specified, all UDP packets destined at the port will be accepted. It takes a single value of type rstring. This value could be a host name or an IP address. port This parameter specifies the port address on which the UDP packets will be accepted. It takes a single value of type rstring or type uint32. This could be a well known port alias, such as http'' or ftp'', as well as a plain port number, such as 45134u. It must be specified when the name parameter is not specified, and it is optional otherwise.

receiveBufferSize This is an optional parameter that is used to override the default kernel receive buffer size. It is of type uint32. parsing | | | | | | | | | | | | This optional parameter can be specified to customize the parsing behavior of the UDPSource operator. There are three valid values, namely: strict, permissive, and fast. When strict is specified, incorrectly formatted tuples will result in a runtime error and termination of the operator. When permissive is specified, incorrectly formatted tuples will result in a runtime log entry to be created, and the parser will continue with the next packet. permissive can only be used with txt, csv, and bin formats. When fast is specified, the input packet is assumed to be formatted correctly, and no runtime checks will be performed. Incorrect input in fast mode causes undefined behavior. The default parsing mode is strict. interface This optional rstring parameter specifies the network interface to use for UDP multicast packets or to register using the name

34

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

parameter. interface is only valid when address or name are specified. Using interface with name will ensure that UDPSink operators with the same name parameter will use the desired interface. name This parameter specifies the name to be used to register the address (of the node hosting the operator) and port pair for this UDPSource operator with the name service, so that the corresponding UDPSink operator can look it up and send its UDP packets to the registered address at the registered port. It is an optional parameter that takes a single value of type rstring. These names are automatically prepended by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set through the use of config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators will keep trying periodically to register the name, with an error message for each failure.

format See the corresponding parameter in the FileSource on page 15 operator for details. The formatting applies to each packet individually. For csv, txt, bin formats, each UDP packet is a single tuple in the corresponding format. For line format, each packet contains a single line and there is no eolMarker parameter. Trailing carriage returns ("\r") and new lines ("\n") will be removed from the string. For block format, each packet contains a single block and there is no blockSize parameter. defaultTuple See the corresponding parameter in the FileSource on page 15 operator for details. hasDelayField See the corresponding parameter in the FileSource on page 15 operator for details. compression See the corresponding parameter in the FileSource on page 15 operator for details. The compression applies to each packet, independently. encoding See the corresponding parameter in the FileSource on page 15 operator for details. The encoding applies to each packet, independently. initDelay See the corresponding parameter in the FileSource on page 15 operator for details. separator See the corresponding parameter in the FileSource on page 15 operator for details. | | | ignoreExtraCSVValues See the corresponding parameter in the FileSource on page 15 operator for details.

Chapter 2. Adapter Operators

35

Windowing The UDPSource operator does not accept any window configurations. Assignments The UDPSource operator does not allow assignments to output attributes. Metrics The UDPSource operator has the following metrics: v nInvalidTuples: The number of tuples that failed to read correctly in csv or txt format. Exceptions The UDPSource operator will throw an exception and terminate the operator in the following cases: v The host cannot be resolved. v The name cannot be located. v Unable to bind to port. v Unable to locate interface.
composite Main { graph // udp socket on local host, using port for "ftp" stream<rstring name, uint32 age, uint64 salary> Beat = UDPSource() { param port : "ftp"; } // udp on local host, accepts packets from "some.node.some.host" on port 23145 stream<Beat> Beat1 = UDPSource() { param address : "some.node.some.host"; port : 23145u; } // same as above, but using a receive buffer size, and waiting 5 secs at start-up stream<Beat> Beat2 = UDPSource() { param address : "some.node.some.host"; port : 23145u; receiveBufferSize : 10240u; initDelay : 5.0; } // same as above but also registers its location to the name server stream<Beat> Beat3 = UDPSource() { param port : 23145u; name : "my_server"; } // udp on local host, uses any port, registers its location to the name server stream<Beat> Beat4 = UDPSource() { param name : "my_server"; } // udp on local host, uses port "ftp", registers its location to the name server stream<Beat> Beat5 = UDPSource() { param port : "ftp"; name : "my_server"; } // udp on local host, accepts packets sent to a multicast address, on port 23145 stream<Beat> Beat6 = UDPSource() { param address : "some.multicast.address"; port : 23145u; } // same as above but also registers its location to the name server stream<Beat> Beat7 = UDPSource() { //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39 //40 //41 //42 //43 //44 //45 //46 //47 //48 //49 //50 //51 //52 //53 //54

36

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

param port : 23145u; name : "my_server"; } // same as above but uses a specific interface for receiving packets stream<Beat> Beat8 = UDPSource() { param address : "some.multicast.address"; port : 23145u; interface : "ib1"; } }

//55 //56 //57 //58 //59 //60 //61 //62 //63 //64 //65 //66 //67

UDPSink
Description The UDPSink operator writes data to a UDP socket in the form of tuples. Each tuple must fit into a single UDP packet and a single UDP packet contains only a single tuple. Input Ports The UDPSink operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious. Output Ports The UDPSink operator does not have any output ports. Parameters The UDPSink operator has the following parameters: address This parameter specifies the address of the receiver where the UDP packets will be sent. It must be specified when the name parameter is not specified and it cannot be specified otherwise. It takes a single value of type rstring. This value could be a host name or an IP address. port This parameter specifies the port address to which the UDP packets will be sent. It must be specified when the name parameter is not specified and it cannot be specified otherwise. See the corresponding parameter in the UDPSource on page 34 operator for details on the type and valid values.

sendBufferSize This is an optional parameter that is used to override the default kernel send buffer size. It is of type uint32. name This parameter specifies the name to be used to look up the address and port pair for the UDPSource operator from the name service, so that this UDPSink operator can send its UDP packets to the registered address at the registered port. It is an optional parameter that takes a single value of type rstring. These names are automatically prepended by the application scope, thus applications with differing scopes cannot communicate through the same name. The application scope can be set through the use of config applicationScope on the main composite in the application. It is an error for a name with the same application scope to be defined multiple times with an instance. If multiple operators attempt to define the same name, the second and subsequent operators will keep trying periodically to register the name, with an error message for each failure.

Chapter 2. Adapter Operators

37

format See the corresponding parameter in the FileSink on page 20 operator. hasDelayField See the corresponding parameter in the FileSink on page 20 operator. compression See the corresponding parameter in the FileSink on page 20 operator. The compression applies to each packet, independently. encoding See the corresponding parameter in the FileSink on page 20 operator. The compression applies to each packet, independently. writePunctuations See the corresponding parameter in the FileSink on page 20 operator. separator See the corresponding parameter in the FileSink on page 20 operator. quoteStrings See the corresponding parameter in the FileSink on page 20 operator. Windowing The UDPSink operator does not accept any window configurations. Assignments The UDPSink operator does not allow assignments to output attributes. Exceptions The UDPSink operator will throw an exception and terminate the operator in the following cases: v The host cannot be resolved. v The name cannot be located.
composite Main { graph stream<rstring name, uint32 age, uint64 salary> Employees = Beacon() {} // send messages to "some.node.some.host", using port for "ftp" () as Beat = UDPSink(Employees) { param address : "some.node.some.host"; port : "ftp"; } // similar to above, but employ a certain send buffer size () as Beat1 = UDPSink(Employees) { param address : "some.node.some.host"; port : 80u; sendBufferSize : 10240u; } // send messages to the source registered at the name server () as Beat2 = UDPSink(Employees) { param name : "my_server"; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25

38

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Export
Description The Export operator sends a stream from the current application, making it available to Import operators of applications running in the same streaming middleware instance. Note that the Export operator does not send tuples to the exported stream inside standalone SPL applications. The tuples are ignored. The Export operator does not allow a config clause to be specified. Input Ports The Export operator has a single, punctuation- oblivious, non-mutating input port. The Export operator does not permit custom port logic to be specified in its invocations. Output Ports The Export operator does not have any output ports. Parameters The Export operator has the following parameters: properties This is an optional parameter, which specifies a tuple literal giving name-value pairs. The supported attribute types are int64, float64, rstring, list<int64>, list<float64>, and list<rstring>. streamId This optional parameter of type rstring specifies the external name of the stream being exported. Only one of streamId or properties can be specified on an Export operator. If neither properties nor streamId is specified, the Export operator is export by properties, with empty properties. Windowing The Export operator does not accept windows on the input port. Assignments The Export operator does not allow assignments to output attributes.
composite Main { graph stream<int32 i> TechSectorBargains = Beacon() { } stream<int32 i> HealthCareSectorBargains = Beacon() { } () as ExportOp1 = Export(TechSectorBargains) { param properties : { kind="bargains", category="tech", tickers=["IBM", "GOOG", "MSFT"] }; } () as ExportOp2 = Export(HealthCareSectorBargains) { param streamId : "someStreamName"; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12

Import
Description The Import operator receives tuples from streams made available by Export operators of applications running in the same streaming middleware instance. Import streams will be matched to Export streams. The match may be done by subscription/properties or by streamId name. For detailed information about how streams are matched see "Dynamic

Chapter 2. Adapter Operators

39

application composition" in the IBM Streams Processing Language Specification. Note that the Import operator will not generate tuples inside standalone SPL applications. The Import operator does not allow a config clause to be specified. | | The Import operator behaves as multiple streams when connected to an input port that is punctuation-expecting. Input Ports The Import operator does not have any input ports. | | Output Ports The Import operator has a single, non-mutating, punctuation-preserving output port. Parameters The Import operator has the following parameters: applicationScope This optional parameter of type rstring selects the application scope selected when the exporting application launched. If the application scope is not specified, "Default" is used. applicationName This optional parameter of type rstring gives the name of the application. If not specified, the fully qualified name of the main composite is used ([namespace::] compositeName). Only valid when streamId is specified. subscription This optional parameter specifies a boolean predicate, which can refer to names defined by the properties of any Export operator within the Import operator's application scope and with a matching schema. streamId This optional parameter of type rstring specifies the external name of the stream being imported. Only one of streamId or subscription can be specified on an Import operator. If neither streamId nor subscription parameters are provided, the Import operator is a subscription import, with an empty subscription that will not match any Export properties. Windowing The Import operator does not accept any window configurations. Assignments The Import operator does not allow assignments to output attributes.
composite Main { type SomeTupleType = int32 x; graph stream<SomeTupleType> I2 = Import() { param applicationScope : "myApplicationScope"; //application scope selected when exporting application launched applicationName : "some.nameSpace::MainOp"; //main operator selected when exporting application launched streamId : "StreamName"; //outside-in name in case of nested composite invokes } stream<SomeTupleType> TechBargains = Import() { param subscription : kind == "bargains" && category == "tech"; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16

40

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

MetricsSink
Description The MetricsSink operator creates Custom Operator Metrics and updates them with values when a tuple is received. These metrics can be viewed using Streams Studio and the streamtool capturestate command. For more information about viewing metrics information, see the IBM InfoSphere Streams: Studio Installation and User's Guide. For more information about the performance metrics that are available for the hosts and jobs in a Streams instance, see the IBM InfoSphere Streams: Installation and Administration Guide. Each operator metric has kind Gauge. For more information about the metrics, see the IBM Streams Processing Language Operator Model Reference. Input Ports The MetricsSink operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious. Output Ports The MetricsSink operator does not have any output ports. Parameters The MetricsSink operator has the following parameters: metrics This is a mandatory parameter that specifies a list of int64 expressions that will be used to set the values of Custom Operator Metrics. Each expression will set the value for one metric. names This is an optional parameter that specifies a list of rstring expressions that are to be used for the name of the Custom Operator Metrics. If present, the number of expressions must match the number of expressions in the metrics parameter. If omitted, the Metric names will default to the SPL expression for each metric.

descriptions This is an optional parameter that specifies a list of rstring expressions that are to be used for the description of the Custom Operator Metrics. If present, the number of expressions must match the number of expressions in the metrics parameter. If omitted, the Metric descriptions will default to empty string {""}. initialValues This is an optional parameter that specifies a list of int64 expressions that are to be used for the starting value for the Custom Operator Metrics. If present, the number of expressions must match the number of expressions in the metrics parameter. If omitted, the initial Metric values will default to 0.
composite Main { graph stream <int64 a, int64 b> A = () as Nil = MetricsSink(A) { param metrics : a, b, a + names : "a", "b", descriptions : "A", "B", //1 //2 //3 //4 //5 //6 //7 //8 //9

Beacon() {}

b, a * b; "sum", "product"; "sum of A and B", "product of A and B";

Chapter 2. Adapter Operators

41

initialValues : 100l, 1000l, -900l, 5l; } }

//10 //11 //12

42

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Chapter 3. Utility Operators


Custom
| | | | | | | | | | | | | | | | | Description The Custom operator is a special logic-related operator that can receive and send any number of streams and does not do anything by itself. Thus, it offers a blank slate for customization. The Custom operator can submit tuples from within its onTuple and onPunct clauses. It requires special support from the compiler. The submit intrinsic function is only available in the Custom operator, and has the following signatures:
<tuple T> void submit (T tupleValue, T port) <tuple T> void submit (enum{WindowMarker, FinalMarker} punctuation, T port ) //1 //2

Input Ports The Custom operator can have one or more input ports. The input ports are mutating and their punctuation mode is Oblivious. Output Ports The Custom operator can have zero or more output ports. The output ports are mutating and their punctuation mode is Generating. However, if the tuple is referenced after a submit call, the compiler ensures that the tuple is unchanged by submitting a copy of the tuple. Parameters The Custom operator does not have any parameters. Windowing The Custom operator does not accept any window configurations. Assignments The Custom operator does not allow assignments to output attributes.

| | | | | | | | | | | | |

composite Main { graph stream<rstring color, int32 intensity> Sensors = Beacon() {} stream<rstring color> Queries = Beacon() {} stream<rstring key, int32 val> Output = Custom(Sensors; Queries) { logic state : mutable map<rstring, int32> m; onTuple Sensors: m[color] = intensity; onTuple Queries: if (color in m) submit({key=color, val=m[color]}, Output); onPunct Queries: if (currentPunct() == Sys.WindowMarker) submit(Sys.WindowMarker, Output); } }

//1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13

Beacon
Description The Beacon operator is a utility source that generates tuples on-the-y. Input Ports The Beacon operator does not have any input ports. Output Ports The Beacon operator is configurable with a single output port. The output port is non-mutating and its punctuation mode is Generating. The Beacon operator will output a window marker punctuation when it finishes generating tuples.
Copyright IBM Corp. 2011, 2012

43

Parameters The Beacon operator has the following parameters: period This optional parameter specifies the time interval between successive tuple submissions, in seconds. It takes a single expression of type float64 as its value. When not specified, it is assumed to be 0.0. iterations This optional parameter specifies the number of tuples to be produced by the Beacon operator. It takes a single expression of type uint32 or int32 as its value, which is evaluated as uint32. When not specified, the Beacon operator produces tuples until the application is shut down. initDelay This optional float64 parameter is used to specify the number of seconds that the Beacon operator is to delay before starting to produce tuples. Windowing The Beacon operator does not accept any window configurations. Assignments The Beacon operator allows assignments to output attributes. The output tuple attributes whose assignments are not specified are assigned their default initializers.
composite Main { graph // with no options stream<rstring name, uint32 age> Beat1 = Beacon() {} // with iterations stream<rstring name, uint32 age> Beat2 = Beacon() { param iterations : 10u; } // with iterations and period stream<rstring name, uint32 age> Beat3 = Beacon() { param period : 0.2; iterations : 10u; } // with output assignments and an initial delay of 5 seconds stream<rstring name, uint32 age> Beat4 = Beacon() { param period : 0.2; initDelay: 5.0; output Beat4 : name = "SPL"+(rstring)(random()*10.0), age = (uint32)(random()*40.0); } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28

Throttle
Description The Throttle operator is used to pace a stream to make it ow at a specified rate. Input Ports The Throttle operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious.

44

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Output Ports The Throttle operator is configurable with a single output port. The output port is non-mutating and its punctuation mode is Preserving. The schema of the output port must match that of the input port. Parameters The Throttle operator has the following parameters: rate This is a mandatory parameter, which specifies the desired rate in tuples per seconds. It is of type float64.

period This is an optional parameter, which specifies the period to be used for maintaining the desired rate in seconds. When making rate adjustments the Throttle operator will only consider the last period, going back from the current time. It is of type float64. By default the period is set to 10.0/rate. includePunctuations This is an optional parameter, which specifies if the punctuations are to be included in the rate computation or not. It is of type boolean. Its default value is false. precise This is an optional parameter of type boolean that specifies if precise blocking should be used. Some systems lack the required resolution to block for very small durations. In such cases, this option enables precise blocking, using busy wait. Windowing The Throttle operator does not accept any window configurations. Assignments The Throttle operator does not allow assignments to output attributes.
composite Main { graph stream<rstring name, uint32 age> Beat = Beacon() {} // with rate only stream<rstring name, uint32 age> Throttled1 = Throttle(Beat) { param rate : 100.0; } // with optional period stream<rstring name, uint32 age> Throttled2 = Throttle(Beat) { param rate : 100.0; period : 0.05; } // with punctuations included stream<rstring name, uint32 age> Throttled3 = Throttle(Beat) { param rate : 100.0; period : 0.05; includePunctuations : true; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22

Delay
Description The Delay operator is used to delay a stream by a given amount while keeping the inter-arrival times of tuples and punctuations intact. Input Ports The Delay operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious.

Chapter 3. Utility Operators

45

Output Ports The Delay operator is configurable with a single output port. The output port is non-mutating and its punctuation mode is Preserving. The output port schema must match the input port schema. Parameters The Delay operator has the following parameters: delay This is a mandatory parameter of type float64, which specifies the delay to be introduced in seconds.

bufferSize This is an optional parameter of type uint32, which specifies the maximum number of tuples and punctuations that could be buffered by the Delay operator. Its default value is 1000. When the buffer is full, the incoming tuples and punctuations are blocked until there is space in the buffer. The final punctuation marker is not forwarded until the buffer is drained. Windowing The Delay operator does not accept any window configurations. Assignments The Delay operator does not allow assignments to output attributes.
composite Main { graph stream<rstring name, uint32 age> Beat = Beacon() {} // with delay only stream<rstring name, uint32 age> Delayed1 = Delay(Beat) { param delay : 2.5; } // with delay and buffer size stream<rstring name, uint32 age> Delayed2 = Delay(Beat) { param delay : 2.5; bufferSize : 250u; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15

Barrier
Description The Barrier operator is used to synchronize tuples from two or more streams. Corresponding tuples from each input port are used to create an output tuple. The Barrier operator will create and send this output tuple only when all the tuples from the input ports are received. Input Ports The Barrier operator is configurable with two or more input ports. The input ports are non-mutating and their punctuation mode is Oblivious. The Barrier operator does not permit custom port logic to be specified in its invocations. Output Ports The Barrier operator is configurable with a single output port. The output port is mutating and its punctuation mode is Free. Parameters The Barrier operator has the following parameters: bufferSize This optional parameter specifies an expression of type uint32 to be used for limiting the size of the internal buffer used to queue

46

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

up tuples from an input port that do not yet have matching tuples from other ports. When the buffer fills up, it results in blocking until more space is available. By default, the buffer has no limit. The default option is often used to synchronize the results from performing parallel tasks on the same stream, as in:

Source

Oper Oper

Barrier

Figure 1. Synchronizing the results from performing parallel tasks on the same stream

The bufferSize parameter is often used when the Barrier operator is used to synchronize streams that originate from different sources with potentially differing rates, as in:
Source Source Barrier

Figure 2. Synchronizing streams that originate from different sources with potentially differing rates

However, this kind of usage implies that the Barrier operator is being used to synchronize logically unrelated streams. Such usage is discouraged. partitionBy<i> This optional parameter specifies one or more expressions to be used for partitioning the input tuples from the port at index i, where the synchronization applies to matching partitions from different ports. When specified, one partitionBy parameter for each input port must appear. partitionByLHS is a synonym for partitionBy0 and partitionByRHS is a synonym for partitionBy<n> where n+1 is the index of the last input port. All partitionBy parameters must have the same number of expressions and the types of the expressions at corresponding positions across different parameters must have matching types as well. Assignments The Barrier operator allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the Barrier operator expects all output tuple attributes to be completely assigned.
composite Main { graph stream<rstring name, rstring gender, uint32 age> BeatA = Beacon() {param period: 1f;} stream<rstring name, rstring gender, rstring job> BeatB = Beacon() {param period: 1f;} stream<rstring name, rstring gender, uint64 salary> BeatC = Beacon() {} // with no buffer size, used for parallel tasks stream<rstring name, rstring gender, uint32 age> OpA = Functor(BeatA) {} stream<rstring name, rstring gender, rstring job> OpB = Functor(BeatB) {} stream<rstring name, rstring gender, uint32 age, rstring job> Res1 = Barrier(OpA; OpB) {} // with buffer size, used for synchronizing independent sources stream<rstring name> OtherBeat = Beacon() {} stream<rstring name1, rstring name2, rstring gender> Res2 = Barrier(BeatA; OtherBeat) { param bufferSize : 1000u; output Res2 : name1 = BeatA.name, name2 = OtherBeat.name; } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18

Chapter 3. Utility Operators

47

// with partitioning stream<rstring name, rstring gender, uint32 age, rstring job> Res3 = Barrier(OpA; OpB) { param partitionByLHS : OpA.name; partitionByRHS : OpB.name; } // with partitioning and more than two ports stream<rstring name, rstring gender, uint64 salary> OpC = Functor(BeatC) {} stream<rstring name, rstring gender, uint32 age, rstring job, uint64 salary> Res4 = Barrier(OpA; OpB; OpC) { param partitionBy0 : OpA.name, OpA.gender; partitionBy1 : OpB.name, OpB.gender; partitionBy2 : OpC.name, OpC.gender; } }

//19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35

Pair
Description The Pair operator is used to pair tuples from two or more streams. Corresponding tuples from each input port are used to create a series of output tuples. The Pair operator will create and send these output tuples only when all the tuples from the input ports are received. It operates exactly like the Barrier operator, with one major difference: Rather than combining the input tuples into one output tuple, it will output them individually, using the order of the input ports, followed by a window marker. As a result, all the input ports and the output port must have the same schema. Input Ports The Pair operator is configurable with two or more input ports. The input ports are non-mutating and their punctuation mode is Oblivious. The Pair operator does not permit custom port logic to be specified in its invocations. Output Ports The Pair operator is configurable with a single output port. The output port is mutating and its punctuation mode is Generating. Parameters The Pair operator has the following parameters: bufferSize See the corresponding parameter in the Barrier on page 46 operator. partitionBy<i> See the corresponding parameter in the Barrier on page 46 operator. Windowing The Pair operator does not accept any window configurations. Assignments The Pair operator does not allow assignments to output attributes.
composite Main { graph // with no buffer size, used for parallel tasks stream<rstring name, uint32 value> OpA = Beacon() {} stream<rstring name, uint32 value> OpB = Beacon() {} stream<rstring name, uint32 value> Res1 = Pair(OpA; OpB) {} // with buffer size, used for synchronizing independent sources stream<rstring name, uint32 value> OpC = Beacon() {} stream<rstring name, uint32 value> Res2 = Pair(OpA; OpC) { //1 //2 //3 //4 //5 //6 //7 //8 //9 //10

48

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

param bufferSize : 1000u; } // with partitioning stream<rstring name, uint32 value> Res3 = Pair(OpA; OpB) { param partitionByLHS : OpA.name; partitionByRHS : OpB.name; } }

//11 //12 //13 //14 //15 //16 //17 //18 //19

Split
Description The Split operator is used to split a stream into one or more output streams, based on a user-specified split condition. Input Ports The Split operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious. Output Ports The Split operator is configurable with one or more output ports. The output ports are non-mutating and their punctuation mode is Preserving. The Split operator requires that the stream type of the input port matches the stream types of the output ports. Parameters The Split operator has the following parameters: index This optional parameter specifies an expression of type int64 or uint64 or a list type of these (list<int64> or list<uint64>) to be used for forwarding input tuples to selected output ports. The values specified by the index parameter are indices that determine the output ports to be used for forwarding. Indexing is 0 based. A negative index results in not forwarding the tuple. A modulo operation is performed on an index that is greater than the highest output port index and the number of output ports. A common use case is to evenly distribute the tuples across the output ports using the <any T> uint64 hashCode(T k) function and relying on the automatic modulo operation performed on the number of ports. A repeated index within a list of indices results in forwarding the tuple to the corresponding output port as many times as the index is repeated. When the index parameter is omitted, then the file and key parameters must be specified. This optional parameter of type rstring specifies a le name to be used for looking up the mapping that determines how input tuples are forwarded to the output ports. The le is organized as a list of mappings, where each mapping is a line of text that contains a key (see the key parameter below) and a list of indices. The indices determine the list of output ports to be used for forwarding a tuple matching to the associated key. The mapping from the input tuple to the key is specified by the key parameter. When the file parameter is specified, the key parameter must also be specified. Each mapping line is formatted similar to a line in a csv le (see FileSource on page 15 for rules governing csv les), yet different mappings can have different number of comma separated values (different number of indices). There is a special key value called default, which specifies the forwarding indices for tuples that do not map to a key in the le. By default, missing keys will result in a runtime exception to be thrown. Repeated keys in the mapping
Chapter 3. Utility Operators

file

49

le also result in a runtime exception. The latter exception is thrown at mapping le load time, rather than lookup time. The # character can be used for commenting in the mapping le. An example mapping le is given below: # mapping.txt default, -1 # drop non-matching items # when the default index line is absent, # non-matching items will result in runtime error # format: column 1 is the split key. The remaining columns # represent the output port indices to be used for forwarding "Harvey Smith", 0 "John Paul Jones", 0 "Ringo Starr", 0, 1 # send to multiple output ports "Bugra Gedik", 1 key This optional parameter specifies a key value to be used for looking up a mapping from the mapping le specified by the file parameter. It is a single expression of any type. When the key parameter is specified, the file parameter must also be specified.

Windowing The Split operator does not accept any window configurations. Assignments The Split operator does not allow assignments to output attributes. Exceptions The Split operator will throw an exception and terminate the operator in the following cases: v The file mapping input le cannot be opened for reading. v A line in the file map input le cannot be parsed. v A key in the file map input le occurs more than once. v The key cannot be matched, and there is no default mapping.
composite Main { graph stream<rstring name, uint32 age> Beat = Beacon() {} // single index (stream<rstring name, uint32 age> LeftBeat1; stream<rstring name, uint32 age> RightBeat1) = Split(Beat) { param index : hashCode(name); } // list of indices (stream<rstring name, uint32 age> AllBeat2; stream<rstring name, uint32 age> LeftBeat2; stream<rstring name, uint32 age> RightBeat2) = Split(Beat) { param index : [0, (age<=30u)?1:2]; } // from a file (stream<rstring name, uint32 age> LeftBeat3; stream<rstring name, uint32 age> RightBeat3) = Split(Beat) { param file : "mapping.txt"; key : name; } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27

50

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

DeDuplicate
Description The DeDuplicate operator suppresses duplicate tuples that are seen within a given time period. Input Ports The DeDuplicate operator has one input port, which is non-mutating. Its punctuation mode is Oblivious. | | | | | | Output Ports The DeDuplicate operator has one or two output ports. The output ports are non-mutating and their punctuation mode is Preserving. The second output port is optional. The stream type of the second output port must match that of the input port. The first output port will receive the tuples that have not been seen within the time period. The second output port (if present) will receive the tuples that have been seen within the time period. Parameters The DeDuplicate operator has the following parameters: | | | | | | | | | | | | | | | | | | | | count This is an optional parameter of type int64. If used, identical tuples are suppressed within the next count tuples received. count may not be used with timeOut or deltaAttribute. delta specifies the difference between the original value of deltaAttribute on an unduplicated tuple, and that of the current tuple. If the difference is greater than delta, the tuple will be emitted.

delta

deltaAttribute This is an optional attribute parameter, analogous to the delta function for window clauses. Together with delta, duplicate tuples will be suppressed unless the original value of deltaAttribute has increased more than delta. This parameter is of type integral, floating point, enum or timestamp. deltaAttribute may not be used with count or timeOut. flushOnPunctuation This optional parameter of type boolean controls whether a punctuation will cause the DeDuplicate operator to forget all history of remembered tuples. If not present, false is assumed. If the value is true, all remembered keys will be erased when a punctuation is received. key This is an optional list of expressions that will be used to determine if a tuple is a duplicate. If omitted, the whole tuple will be used as the key.

| | | | | | | |

resetOnDuplicate This is an optional parameter of type boolean that determines if a duplicate tuple that is suppressed causes the timeOut, count, or deltaAttribute on the saved tuple to be reset to that of the current value. timeOut will reset the time to the current time, count will reset to the current tuple number, and deltaAttribute will reset to the current value of the attribute. If not specified, the default is true. timeOut This is an optional parameter of type float64 that specifies the number of seconds during which no duplicate of a tuple will be
Chapter 3. Utility Operators

51

emitted. If not specified, 600.0 seconds (10 minutes) will be used as the default. Identical tuples, separated by more than timeOut seconds, will be seen on the output port. | | | | | | timeOut may not be used with count or deltaAttribute. Note: Tuples are retained by the DeDuplicate operator until timeOut seconds elapse, count tuples are processed, or until delta is large enough. If the rate of incoming unique tuples is large, then large values of these parameters could cause the DeDuplicate operator to occupy a huge amount of memory. Windowing The DeDuplicate operator does not accept any window configurations. | | | | | Assignments The first output port of the DeDuplicate operator allows assignments to output attributes. The output tuple attributes whose assignments are not specified are automatically forwarded from the input ones. After the automatic forwarding, the DeDuplicate operator expects all output tuple attributes to be completely assigned.
composite Main { graph stream<rstring name, uint32 age> Data = Beacon () { param period : 0.1; output Data : name = "test", age = (uint32)(random()*80.0); } // only pass tuples that have unique ages in the last minute stream<rstring name, uint32 age> Out = DeDuplicate(Data) { param timeOut : 60.0; // only remember for 60 seconds key : age / 3u; // expression to be used for checking duplicates } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16

| | | | | | |

//detect duplicates based on couple of attribute values //1 // Duplicated data will go to the Duplicates stream //2 (stream<rstring name , rstring country> Out; stream<Data> Duplicates) = DeDuplicate (Data) { //3 param timeOut : 60.0; //4 key : name, country; //duplicate checking uses both name and country. //5 // "Harvey", "Canada" is not a duplicate of "Harvey", "USA" //6 } //7

Union
Description The Union operator combines tuples from streams connected to different input ports. Input Ports The Union operator has two or more input ports. Each input port must have a schema that contains attributes of the same name and type as those of the output port. The order of the attributes need not match the order found in the output port. The input port schemas may contain additional attributes that do not appear in the output port schema. The input ports are non-mutating. Output Ports The Union operator has one output port. The output port is mutating and its punctuation mode is Free. Parameters The Union operator does not have any parameters.

52

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Windowing The Union operator does not accept any window configurations. Assignments The Union operator does not allow assignments to output attributes. The output tuple attributes are automatically forwarded from the input ones.
composite Main { graph stream<rstring a, int32 d, rstring b> A = Beacon() {} stream<rstring a, rstring b, uint32 c> B = Beacon() {} // Accept all tuples from A and B, and grab the b and a fields from each stream <rstring b, rstring a> C = Union (A; B) {} } //1 //2 //3 //4 //5 //6 //7

ThreadedSplit
Description TheThreadedSplit operator splits tuples across multiple output ports to improve concurrency. One thread will be used for each output port to submit tuples as quickly as possible. It is not possible to determine in advance to which output port a given tuple will be sent. The ThreadedSplit operator maintains one buffer for each output port. When all the queues fill up, the ThreadedSplit operator will busy wait until there is space in one of them to add a new tuple, therefore blocking the input tuple. When a thread submitting tuples encounters an empty buffer, it will busy wait until a tuple arrives at its buffer. The ThreadedSplit operator is not a direct replacement for the Split operator. The ThreadedSplit operator does not allow choosing to which stream a tuple is sent. Input Ports The ThreadedSplit operator has one input port, which is non-mutating. Its punctuation mode is Oblivious. Output Ports The ThreadedSplit operator is configurable by one or more output ports. The schema for each output port must match the schema of the input port. The output ports are mutating and their punctuation mode is Preserving. Parameters The ThreadedSplit operator takes only one parameter: bufferSize This is a mandatory parameter of type uint32 that specifies the size of each buffer used to store the input tuples for each output port. The final punctuation marker is not forwarded until all buffers are drained. Windowing The ThreadedSplit operator does not accept any window configurations. Assignments The ThreadedSplit operator does not allow assignments to output attributes. The output tuple attributes are automatically forwarded from the input ones.
composite Main { graph stream<rstring name> Input = Beacon () {} // split tuples to two output streams. (stream<rstring name> Out1; stream<rstring name> Out2) = ThreadedSplit(Input) { //1 //2 //3 //4 //5 //6 //7 //8

Chapter 3. Utility Operators

53

param bufferSize: 1000u; } }

//9 //10 //11 //12

DynamicFilter
Description The DynamicFilter is a version of the Filter operator that can decide at runtime which input tuples will be passed through, based on control input it receives. Input Ports The DynamicFilter operator has three input ports. The rst port is the stream of tuples to be ltered. The tuples on the rst port will be passed through the DynamicFilter if their key matches a valid key within the DynamicFilter operator. The second port is a stream of tuples that contain expressions to be added to the valid keys in the DynamicFilter operator. The third port is a stream of tuples that contain expressions to be removed from the valid keys in the DynamicFilter operator. The rst two input ports are mandatory, whereas the third one is optional. All input ports are non-mutating. Output Ports The DynamicFilter operator has one or two output ports. The rst output port is mandatory, and tuples from the rst input stream that contain valid keys will be submitted to the rst output port. The second output port is optional, and if present, will contain the tuples that do not contain a valid key. The schemas of the output ports must match the schema of the rst input port. The output ports are non-mutating and their punctuation mode is Preserving. Parameters The DynamicFilter operator has the following parameters: key This is a list of expressions that will be used to used to determine the key on the input tuple. It is a mandatory parameter.

addKey This is a list of expressions that will be used to used to add keys to the set of valid keys. It is a mandatory parameter. The number and types of the addKey expressions must match those of the key parameter. removeKey This is a list of expressions that will be used to remove keys from the set of valid keys. It is an optional parameter. It cannot be specified if the DynamicFilter operator is not configured with three input ports. It must be specified otherwise. The number and types of the removeKey expressions must match those of the key parameter. Windowing The DynamicFilter operator does not accept any window configurations. Assignments The DynamicFilter operator does not allow assignments to output attributes. The output tuple attributes are automatically forwarded from the input ones.
composite Main { graph stream<rstring key, uint32 age> Data = Beacon () {} stream<uint32 age> AddKeys = Beacon () {} //1 //2 //3 //4

54

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

stream<uint32 removeKey> RemoveKeys = Beacon () {} // Only pass tuples that match the current key stream<rstring key, uint32 age> Out = DynamicFilter(Data; AddKeys; RemoveKeys) { param key : Data.age; // key expression to be used for for matches addKey : AddKeys.age - 1u; // value to be add to valid keys removeKey : removeKey + 1u; // value to be removed from valid keys } }

//5 //6 //7 //8 //9 //10 //11 //12 //13 //14

Gate
Description The Gate operator is used to control the rate at which tuples are passed through. Each tuple that passes through Gate must be acknowledged by a downstream operator, in order to let more tuples through. Input Ports The Gate operator has two input ports. The rst port is the data port, and holds the tuples to be gated. The second input port is the control port, and will be used to acknowledge data tuples. Both input ports are non-mutating and their punctuation mode is Oblivious. Output Ports The Gate operator is configurable with a single output port. The output port is non-mutating and its punctuation mode is Preserving, using punctuation from the rst (data) port. The schema of the output port must match that of the rst input port. Parameters The Gate operator has the following parameters: maxUnackedTupleCount This is a mandatory parameter of type uint32, which specifies the number of tuples allowed through the Gate before they are acknowledged. numTuplesToAck This is an optional parameter of type uint32, which specifies the number of tuples to be acknowledged for each control tuple received. If not specified, the value is 1u.
composite Main { graph stream<rstring name, uint32 age> Data = Beacon() {} stream<Data> Gated = Gate(Data; Control) { param maxUnackedTupleCount : 1u; // allow only 1 tuple to go through at a time numTuplesToAck : Control.count; // acknowledge given number of tuples } (stream<Data> Output; stream<uint32 count> Control) = Custom(Gated) { logic onTuple Gated: { println (Gated); // do some processing submit(Gated, Output); // pass it on to the output submit({count = 1u}, Control); // tell gate to allow another tuple } // no more than 1 tuple will be queued up at any time config threadedPort : queue(Gated, Sys.Wait); } } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22

Chapter 3. Utility Operators

55

Implementing Load Balancing of Tuples


A ThreadedSplit operator may be used to split a stream, and process tuples in parallel. This works for many applications, but if the processing time of a tuple varies considerably depending on the tuple data, it may cause problems where a tuple with a long processing time may cause subsequent tuples to be backed up in the stream waiting for processing, even though there may be another thread available and idle. This may be aggravated by tuples that are in a TCP/IP connection to another PE. In order to ensure load balancing, a ThreadedSplit operator with a buffer size of 1 may be tied to two or more Gate operators with a maximum unacknowledged tuple count of 1 or more. The ThreadedSplit and the Gate operators must be on the same PE, to avoid tuples queuing between PEs. A partitionColocation placement config is used to ensure this. The Gate operator will allow only the specified number of tuples (1 in this case) to pass at a time. It will wait until a subsequent operator has acknowledged receipt of the tuple before passing the next tuple. In this manner, no tuple will ever be queued behind another, waiting to be processed. Here is an abstract example of load balancing:
composite Main { graph // Generate a stream of data to process stream<uint32 i> I = Beacon(){ logic state : mutable uint32 c = 0 ; output I : i = c ++ ; } // Split the stream into 2 streams. Use a following Gate to ensure load balancing (stream<I> X ; stream<I> Y)= ThreadedSplit(I){ param bufferSize : 1u ; config placement : partitionColocation("Split"), // ensure same PE as the Gates partitionExlocation("Process"); } stream<I> O0 = Gate(X ; Control0){ param maxUnackedTupleCount : 1u ; config placement : partitionColocation("Split"); // ensure same PE as ThreadedSplit } stream<I> O1 = Gate(Y ; Control1){ param maxUnackedTupleCount : 1u ; config placement : partitionColocation("Split"); // ensure same PE as ThreadedSplit } (stream<I> R0 as out ; stream<uint32 i> Control0 as control)= Custom(O0 as In){ logic onTuple In : { // do some processing submit(In, out); // forward tuple submit({ i = 1u }, control); } // Place on a different PE from Gate or other processing operator config placement : partitionExlocation("Process"); } //1 //2 //3 //4 //5 //6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39 //40 //41 //42 //43 //44 //45 //46 //47 //48

56

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

(stream<I> R1 as out ; stream<uint32 i> Control1 as control)= Custom(O1 as In){ logic onTuple In : { // do some processing submit(In, out); // forward tuple submit({ i = 1u }, control); } // Place on a different PE from Gate and other processing operator config placement : partitionExlocation("Process"); } }

//49 //50 //51 //52 //53 //54 //55 //56 //57 //58 //59 //60 //61

JavaOp
Description The JavaOp operator is used to call out to operators implemented in Java using the Java Operator API. Input Ports The JavaOp operator is configurable with zero or more input ports. Output Ports The JavaOp operator is configurable with zero or more output ports. Parameters The JavaOp operator has the following constant parameters: generated This optional boolean parameter, if set to true, indicates that the compilation generates classes and interfaces specific to the operator and its ports. By default it is set to false. className This required parameter defines the full class name of the operator that is used to process and submit tuples, as a rstring expression. classLibrary This required parameter defines an application-specific class path required by the operator's class. It is defined as one or more rstring expressions that correspond to a JAR file or a directory containing Java classes. vmArg This optional parameter contains command-line arguments to be passed to the Java virtual machine the operator is executed within. It comprises of one or more rstring expressions. In addition to the parameters listed above, arbitrary constant parameters are allowed. These parameters are specific to the operator and can be read by the operator's class. For more information about the JavaOp operator and its API, see the IBM Streams Processing Language Toolkit Development Reference. Windowing The JavaOp operator accepts any window configuration including not windowed. Assignments The JavaOp operator does not allow assignments to output attributes.
// Invocation of Java operator using the the JavaOp operator // from the SPL toolkit. // // Creates a stream containing the Java VMs system properties stream<ustring name, ustring value, set<ustring> tags> SystemProperties //1 //2 //3 //4 //5

Chapter 3. Utility Operators

57

= JavaOp() { param // Class name of the Java Operator to be invoked. // Set to a class that implements com.ibm.streams.operator.Operator // className : "com.ibm.streams.operator.samples.sources.SystemPropertySource"; // Where the class is located. A list of jar files or // directories containing the required classes for the // Operator instance. // In this case the class is from the set of samples // provided with the Java Operator API and thus is always // in the class path. classLibrary: ""; // // // // // // // // // Arbitrary JVM arguments can be passed using vmArg. When multiple Java operators invocations are fused together or executed in standalone mode they must have identical vmArg values. To avoid different vmArgs settings, it is recommended that system properties set in vmArg are limited to those that must be set during JVM startup. Other properties may be set by initialization Java code of the Operator instance.

vmArg: "-Xmx128m"; // // // // // // // // JavaOp allows additional arbitrary parameters that are made available to the Operator instance through methods on the OperatorContext api. A parameter may have one or more values SystemProperties class puts the value of the parameter tags as the tags attribute for any system property that starts with the value of the tagged parameter.

tagged: "java."; tags: "system", "vm"; }

//6 //7 //8 //9 //10 //11 //12 //13 //14 //15 //16 //17 //18 //19 //20 //21 //22 //23 //24 //25 //26 //27 //28 //29 //30 //31 //32 //33 //34 //35 //36 //37 //38 //39 //40 //41 //42 //43 //44 //45

58

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Chapter 4. Compat Operators


V1TCPSource
Description The V1TCPSource operator reads binary data from a TCP socket that is connected to a InfoSphere Streams Version 1 TCP Sink operator, and creates tuples out of it. It can be configured as a TCP server (listens for a client connection) or as a TCP client (initiates a connection to a server). In both modes it handles a single connection at a time. It works with both IPv4 and IPv6 addresses. The Version 1 TCP Sink must have binFormat modifier. The V1TCPSource operator is in the spl.compat namespace, and must be included through a use statement. Input Ports The V1TCPSource operator does not have any input ports. Output Ports The V1TCPSource operator is configurable with a single output port. The output port is mutating and its punctuation mode is Generating. The V1TCPSource operator will output a window marker punctuation when a TCP connection terminates. The tuple generated by the V1TCPSource operator must only contain attributes of the following types: int8, int16, int32, int64, float32, float64, boolean, rstring, and list<T>, where T is one of the previous types. These correspond to the types in Version 1 Streams that are supported by SPL. Parameters The operator has the following parameters: role This mandatory parameter specifies whether the V1TCPSource operator is server-based or client-based. It takes one of the following two values: server and client. In the case of a client-based V1TCPSource operator, this parameter specifies the destination address of the TCP connection. The address parameter must be specified when the role parameter is set to client and must not be specified when the role parameter is server. It takes a single value of type rstring. This value could be a host name or an IP address. port In the case of a server-based V1TCPSource operator, this parameter specifies the port address on which the connections will be accepted. In the case of a client-based V1TCPSource operator, it specifies the destination port address. It takes a single value of type rstring or type uint32. This could be a well known port alias, such as ''http'' or ''ftp''2, as well as a plain port number, such as 45134u. It is an optional parameter for server-based V1TCPSource operators and when omitted its default value is 0, which picks any available port. For client-based V1TCPSource operators, the port parameter must be specified.

address

2. As specied under /etc/services Copyright IBM Corp. 2011, 2012

59

receiveBufferSize This is an optional parameter that is used to override the default kernel receive buffer size. It is of type uint32. reconnectionPolicy This is an optional parameter that specifies the reconnection policy. In the case of a server-based V1TCPSource operator, this parameter specifies if additional connections are allowed once the initial connection terminates. In the case of a client-based V1TCPSource operator, this parameter specifies if additional connection attempts will be made once the initial connection to the server terminates. The valid values are: NoRetry, InfiniteRetry, and BoundedRetry. If not specified, it is set to InfiniteRetry. When set to NoRetry, the V1TCPSource operator produces a final marker punctuation right away, after the initial connection is terminated and a window marker punctuation is sent. reconnectionBound This parameter specifies the number of successive connections that will be attempted for a client-based V1TCPSource operator or accepted for a server-based V1TCPSource operator. It is an optional parameter of type uint32. It must appear when the reconnectionPolicy parameter is set to BoundedRetry and cannot appear otherwise. initDelay See the corresponding parameter in the FileSource on page 15 operator. separator See the corresponding parameter in the FileSource on page 15 operator. Windowing The V1TCPSource operator does not accept any window configurations. Assignments The V1TCPSource operator does not allow assignments to output attributes. Metrics The V1TCPSource operator has the following metrics: v nReconnections: The number of times the V1TCPSource operator lost connection and reconnected to the other end of the TCP socket. v nInvalidTuples: The number of tuples that failed to read correctly in csv or txt format. Exceptions The V1TCPSource operator will throw an exception and terminate the operator in the following cases: v The host cannot be resolved. v Unable to set SO_REUSEADDR on TCP socket. v Unable to bind to port.
composite Main { //1 graph //2 // Read from an InfoSphere Streams V1 system //3 stream<rstring name, int32 age, int64 salary> v1Data = spl.compat::V1TCPSource() //4 { //5 param port : 4000u; //6 role : server; //7 } //8 } //9

60

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Corresponding V1 TCP Source:


stream FromV2 (schemaFor(T)) := Source() ["stcp://thishost:4001/", binFormat] {} //1 //2

V1TCPSink
Description The V1TCPSink operator writes binary data to a TCP socket that is connected to a InfoSphere Streams Version 1 TCP Source operator. It can be configured as a TCP server (listens for a client connection) or as a TCP client (initiates a connection to a server). In both modes it handles a single connection at a time. The V1TCPSink operator works with both IPv4 and IPv6 addresses. The Version 1 TCP Source must havebinFormat modifier. The V1TCPSink operator is in the spl.compat namespace, and must be included through a use statement. Input Ports The V1TCPSink operator is configurable with a single input port. The input port is non-mutating and its punctuation mode is Oblivious. The tuple accepted by the V1TCPSink operator must only contain attributes of the following types: int8, int16, int32, int64, float32, float64, boolean, rstring, and list<T>, where T is one of the previous types. These correspond to the types in Version 1 Streams that are supported by SPL. Output Ports The V1TCPSink operator does not have any output ports. Parameters The V1TCPSink operator has the following parameters: role address See the corresponding parameter in the V1TCPSource on page 59 operator. port See the corresponding parameter in the V1TCPSource on page 59 operator. See the corresponding parameter in the V1TCPSource on page 59 operator.

sendBufferSize This is an optional parameter that is used to override the default kernel send buffer size. It is of type uint32. reconnectionPolicy See the corresponding parameter in the V1TCPSource on page 59 operator. reconnectionBound See the corresponding parameter in the V1TCPSource on page 59 operator. flush See the corresponding parameter in the FileSink on page 20 operator.

Windowing The V1TCPSink operator does not accept any window configurations. Assignments The V1TCPSink operator does not allow assignments to output attributes.

Chapter 4. Compat Operators

61

Metrics The V1TCPSink operator has the following metrics: v nReconnections: The number of times the V1TCPSink operator lost the connection and reconnected to the other end of the TCP socket. Exceptions The V1TCPSink operator will throw an exception and terminate the operator in the following cases: v The host cannot be resolved.
composite Main { //1 graph //2 stream<rstring name, int32 age, int32 salary> A = Beacon() {} //3 // Send the tuple out to an InfoSphere Streams V1 system //4 () as Nil = spl.compat::V1TCPSink(A) //5 { //6 param //7 address : "localhost"; //8 port : 4001u; //9 role : client; //10 flush : 1u; //11 } //12 } //13

Corresponding V1 TCP Sink:


Nil := Sink(A) ["ctcp://thishost:4000/", binFormat] {} //1

Compat.Sample
The KeyedAggregate, KeyedDelay, KeyedJoin, Normalize, and PunctMerger operators have been ported from the SPADE sample operators for developers who used these operators. However, these operators are deprecated and will not be available in the next release of InfoSphere Streams. Use these operators only as temporary solution for porting applications that relied on those samples from SPADE.

62

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Notices
This information was developed for products and services offered in the U.S.A. Information about non-IBM products is based on information available at the time of first publication of this document and is subject to change. IBM may not offer the products, services, or features discussed in this document in other countries. Consult your local IBM representative for information on the products and services currently available in your area. Any reference to an IBM product, program, or service is not intended to state or imply that only that IBM product, program, or service may be used. Any functionally equivalent product, program, or service that does not infringe any IBM intellectual property right may be used instead. However, it is the user's responsibility to evaluate and verify the operation of any non-IBM product, program, or service. IBM may have patents or pending patent applications covering subject matter described in this document. The furnishing of this document does not grant you any license to these patents. You can send license inquiries, in writing, to: IBM Director of Licensing IBM Corporation North Castle Drive Armonk, NY 10504-1785 U.S.A. For license inquiries regarding double-byte character set (DBCS) information, contact the IBM Intellectual Property Department in your country or send inquiries, in writing, to: Intellectual Property Licensing Legal and Intellectual Property Law IBM Japan Ltd. 1623-14, Shimotsuruma, Yamato-shi Kanagawa 242-8502 Japan The following paragraph does not apply to the United Kingdom or any other country/region where such provisions are inconsistent with local law: INTERNATIONAL BUSINESS MACHINES CORPORATION PROVIDES THIS PUBLICATION AS IS WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. Some states do not allow disclaimer of express or implied warranties in certain transactions; therefore, this statement may not apply to you. This information could include technical inaccuracies or typographical errors. Changes are periodically made to the information herein; these changes will be incorporated in new editions of the publication. IBM may make improvements and/or changes in the product(s) and/or the program(s) described in this publication at any time without notice.

Copyright IBM Corp. 2011, 2012

63

Any references in this information to non-IBM Web sites are provided for convenience only and do not in any manner serve as an endorsement of those Web sites. The materials at those Web sites are not part of the materials for this IBM product and use of those Web sites is at your own risk. IBM may use or distribute any of the information you supply in any way it believes appropriate without incurring any obligation to you. Licensees of this program who wish to have information about it for the purpose of enabling: (i) the exchange of information between independently created programs and other programs (including this one) and (ii) the mutual use of the information that has been exchanged, should contact: IBM Canada Limited Office of the Lab Director 8200 Warden Avenue Markham, Ontario L6G 1C7 CANADA Such information may be available, subject to appropriate terms and conditions, including, in some cases, payment of a fee. The licensed program described in this document and all licensed material available for it are provided by IBM under terms of the IBM Customer Agreement, IBM International Program License Agreement, or any equivalent agreement between us. Any performance data contained herein was determined in a controlled environment. Therefore, the results obtained in other operating environments may vary significantly. Some measurements may have been made on development-level systems, and there is no guarantee that these measurements will be the same on generally available systems. Furthermore, some measurements may have been estimated through extrapolation. Actual results may vary. Users of this document should verify the applicable data for their specific environment. Information concerning non-IBM products was obtained from the suppliers of those products, their published announcements, or other publicly available sources. IBM has not tested those products and cannot confirm the accuracy of performance, compatibility, or any other claims related to non-IBM products. Questions on the capabilities of non-IBM products should be addressed to the suppliers of those products. All statements regarding IBM's future direction or intent are subject to change or withdrawal without notice, and represent goals and objectives only. This information may contain examples of data and reports used in daily business operations. To illustrate them as completely as possible, the examples include the names of individuals, companies, brands, and products. All of these names are fictitious, and any similarity to the names and addresses used by an actual business enterprise is entirely coincidental. COPYRIGHT LICENSE: This information contains sample application programs, in source language, which illustrate programming techniques on various operating platforms. You may copy,

64

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

modify, and distribute these sample programs in any form without payment to IBM for the purposes of developing, using, marketing, or distributing application programs conforming to the application programming interface for the operating platform for which the sample programs are written. These examples have not been thoroughly tested under all conditions. IBM, therefore, cannot guarantee or imply reliability, serviceability, or function of these programs. The sample programs are provided AS IS, without warranty of any kind. IBM shall not be liable for any damages arising out of your use of the sample programs. Each copy or any portion of these sample programs or any derivative work must include a copyright notice as follows: (your company name) (year). Portions of this code are derived from IBM Corp. Sample Programs. Copyright IBM Corp. _enter the year or years_. All rights reserved.

Trademarks
IBM, the IBM logo, ibm.com and InfoSphere are trademarks or registered trademarks of International Business Machines Corp., registered in many jurisdictions worldwide. A current list of IBM trademarks is available on the Web at Copyright and trademark information at www.ibm.com/legal/ copytrade.shtml. The following terms are trademarks or registered trademarks of other companies v Linux is a registered trademark of Linus Torvalds in the United States, other countries, or both. v Java and all Java-based trademarks and logos are trademarks of Sun Microsystems, Inc. in the United States, other countries, or both. v UNIX is a registered trademark of The Open Group in the United States and other countries. v Microsoft, Windows, Windows NT, and the Windows logo are trademarks of Microsoft Corporation in the United States, other countries, or both. Other product and service names might be trademarks of IBM or other companies.

Notices

65

66

IBM InfoSphere Streams Version 2.0.0.4: IBM Streams Processing Language Standard Toolkit Reference

Printed in USA

You might also like