InfoSphere Streams——實時大數據分析平臺
了解 ,它是 IBM 大數據平臺的一部分。 解決了針對能夠實時處理生成的海量流數據的平臺和架構的一種迫切需求。了解該產品的設計目標,它適用于哪些時機,其工作原理,以及它如何為 InfoSphere BigInsights 提供補充來執行高度復雜的分析。
來自多個來源的信息正在以難以置信的速度增長。互聯網用戶數量在 2015 年已經達到 22.7 億。每一天,Twitter 都會生成超過 12 TB 的 tweet,Facebook 生成超過 25 TB 日志數據,紐約證券交易所采集 1 TB 交易信息。每天會創建大約 300 億個射頻識別 (RFID) 標記。此外,每年銷售的數億臺 GPS 設備,目前正在使用的超過 3000 萬個連網的傳感器(而且每年在以高于 30% 的速度增長),都在產生數據。這些數據量預計在未來 10 年中每 2 年就會翻一番。
一家公司在一年時間內可生成高達數 PB 的信息:網頁、博客、單擊流、搜索索引、社交媒體論壇、即時消息、文本消息、電子郵件、文檔、用戶人口統計數據、來自主動和被動系統的傳感器數據,等等。許多人估計,這些數據中高達 80% 都是半結構化或非結構化數據。公司一直在尋求更加敏捷地經營業務,以更加創新的方式執行數據分析和決策流程。而且他們認識到,這些流程中損失的時間可能導致錯失業務機會。挑戰的核心是,公司掌握輕松地分析和理解互聯網級信息的能力,就像他們現在可分析和理解較少量結構化信息一樣。

IBM 正在幫助公司應對大數據挑戰,為他們提供工具來集成和管理海量、高速產生的數據,應用原生格式的分析,可視化可用數據以進行專門分析,等等。本文將介紹 InfoSphere Streams,該技術支持您同時分析許多數據類型并實時執行復雜計算。您將了解 InfoSphere Streams 的工作原理,它的用途,以及如何結合使用它與另一個用于的 IBM 產品(IBM InfoSphere BigInsights)來執行高度復雜的分析。
InfoSphere BigInsights:概述
理解 InfoSphere BigInsights 將會使您能夠更全面地理解 InfoSphere Streams 的用途和價值。
BigInsights 是一個分析平臺,可幫助公司將復雜的互聯網級信息集轉換為洞察。它包含一個套裝的 Apache Hadoop 發行版(具有高度簡化的安裝流程)和用于應用程序開發、數據移動和集群管理的關聯工具。得益于簡單性和可伸縮性,Hadoop(MapReduce 框架的一種開源實現)在行業和學術界獲得的巨大的成功。除了 Hadoop 之外,BigInsights 中的其他開源技術(除 Jaql 外的所有技術都屬于 Apache Software Foundation 項目)包括:
- Pig:該平臺提供了一種高級語言來表達分析大數據集的程序。Pig 配備了一個編譯器,可將 Pig 程序轉換為 Hadoop 框架執行的 MapReduce 作業序列。
- Hive:一個構建于 Hadoop 環境之上的數據倉庫解決方案。它為 Hadoop 的非結構化世界帶來了人們熟悉的關系數據庫概念,比如表、列和分區,以及 SQL 的一個子集 (HiveQL)。Hive 查詢被編譯為使用 Hadoop 執行的 MapReduce 作業。
- Jaql:IBM 專為 JSON(JavaScript Object Notation,JavaScript 對象表示法)開發的一種查詢語言,提供了一種類似 SQL 的接口。Jaql 適度地處理嵌套,高度面向函數,而且非常靈活。它適用于松散的結構化數據,是 HBase 列存儲和文本分析的接口。
- HBase:一個面向列的 NoSQL 數據存儲環境,旨在支持 Hadoop 中大型、稀疏填充的表。
- Flume:一種分布式、可靠且可用的服務,用于高效地移動生成的大量數據。Flume 非常適合從多個系統中收集生成的日志,在它們插入 HDFS(Hadoop Distributed File System,Hadoop 分布式文件系統)。
- Lucene:一個搜索引擎庫,提供了高性能的、全功能的文本搜索。
- Avro:一種數據序列化技術,使用 JSON 來定義數據類型和協議,以一種緊湊的二進制格式對數據執行序列化。
- ZooKeeper:一種維護配置信息和命名,提供分布式同步和分組服務的集中化服務。
- Oozie:一個工作流調度程序系統,用于管理和編排 Apache Hadoop 作業的執行過程。
此外,BigInsights 發行版還包含以下 IBM 獨有的技術:
- BigSheets:一種基于瀏覽器、類似電子表格的查詢和探索接口,使業務用戶能夠輕松地收集和分析數據,利用 Hadoop 的強大功能。它提供了內置的閱讀器,可處理多種常見格式的數據,包括 JSON、逗號分隔值 (CSV) 和制表符分隔值 (TSV)。
- Text analytics:常見業務實體的文本注釋符的一個預先構建的庫。它提供了豐富的語言和工具來構建自定義位置注釋符。
- Adaptive MapReduce:一個 IBM Research 解決方案,通過更改 MapReduce 任務的處理方式來加速小型 MapReduce 作業的執行。
InfoSphere 平臺
是一個綜合性的信息集成平臺,包含數據倉庫和分析、信息集成、主數據管理、生命周期管理,以及數據安全和隱私。該平臺改進了應用程序開發流程,所以組織可以加快價值實現速度,減少集成成本,并提高信息質量。
一般來講,BigInsights 的設計并不是為了取代一種傳統的關系數據庫管理系統 (DBMS) 或傳統的數據倉庫。具體來講,它沒有針對對表列數據結構的交互式查詢、在線分析處理 (OLAP) 或在線事務處理 (OLTP) 應用程序而優化。但是,作為 IBM 大數據平臺的組成部分,BigInsights 提供了與該平臺的其他組件(包括數據倉庫、數據集成和治理引擎)和第三方數據分析工具的潛在集成點。在本文后面將會看到,它還可與 InfoSphere Streams 集成。
流計算:一種新的計算模式
流計算是新的數據聲場場景所不可或缺的一種新計算模式,比如無處不在的移動設備、位置服務和遍布各處的傳感器。人們需要可伸縮的計算平臺和并行架構來處理生成的海量流數據。
BigInsights 技術不足以支持實時流處理任務,因為它們主要面向對靜態數據的批處理的支持。在處理靜態數據的過程中,列出所有已連網的用戶這樣的查詢會得到單一的結果集。借助對流數據的實時處理,您可執行一種持續查詢,比如列出在過去 10 分鐘內連網的所有用戶。此查詢將返回持續更新的結果。在靜態數據領域中,用戶猶如在干草堆中撈針;在流數據領域中,用戶可輕松地找到這顆針,因為干草已被吹走。

InfoSphere Streams 平臺支持流數據的實時處理,支持不斷更新持續查詢的結果,可在仍在移動的數據流中檢測洞察。
InfoSphere Streams 概述
InfoSphere Streams 旨在從一個幾分鐘到幾小時的窗口中的移動信息(數據流)中揭示有意義的模式。該平臺能夠獲取低延遲洞察,并為注重時效的應用程序(比如欺詐檢測或網絡管理)獲取更好的成果,從而提供業務價值。InfoSphere Streams 還可合并多個流,使您能夠從多個流中獲取新洞察,如圖 3 所示。

圖 3. 合并的流處理
InfoSphere Streams 的主要設計目的是:
- 快速響應事件和不斷變化的業務條件與需求。
- 支持以比現有系統快幾個數量級的速度對數據執行持續分析。
- 快速適應不斷變化的數據形式和類型。
- 管理新的流模式的高可用性、異構性和分布。
- 為共享的信息提供安全性和信息機密性。
InfoSphere Streams 提供了一種編程模型和 IDE 來定義數據來源,還提供了已融合到處理執行單元中的稱為運算符的軟件分析模塊。它還提供了基礎架構來支持從這些組件合成可擴展的流處理應用程序。主要平臺組件包括:
- 運行時環境:這包括平臺服務,以及一個用于在單個主機或一組集成的主機上部署和監視 Streams 應用程序的調度程序。
- 編程模型:您可使用 SPL(Streams Processing Language,流處理語言,一種聲明性語言)來編寫 Streams 應用程序。可使用該語言陳述您的需求,運行時環境會承擔確定如何最佳地服務該請求的責任。在此模型中,一個 Streams 應用程序表示為一個由運算符和連接它們的流組成的圖表。
- 監視工具和管理接口:Streams 應用程序處理數據的速度比普通的操作系統監視實用程序快得多。InfoSphere Streams 提供了可處理此環境的工具。
流處理語言
SPL,InfoSphere Streams 的編程語言,是一種分布式數據流合成語言。它是一種類似 C++ 或 Java™ 的可擴展且全功能的語言,支持用戶定義的數據類型。您可以使用 SPL 或原生語言(C++ 或 Java)編寫自定義函數。也可以使用 C++ 或 Java 編寫用戶定義的運算符。
InfoSphere Streams 持續應用程序會描述一個導向圖,該圖由各個互聯且處理多個數據流的運算符組成。數據流可來自系統外部,或者在應用程序內部生成。SPL 程序的基本構建塊包括:
- 流:一個無限的結構化元組序列。它可逐個元組地由運算符使用或通過一個窗口的定義來使用。
- 元組:屬性及其類型的一個結構化列表。流上的每個元組擁有由其流類型指定的形式。
- 流類型:指定元組中每個屬性的名稱和數據類型。
- 窗口:一個有限、有序的元組分組。它可以基于計數、時間、屬性值或標點符號。
- 運算符:SPL 的基礎構建塊,它的運算符會處理來自流的數據并可生成新流。
- 處理元素 (PE):基礎執行單元。一個 PE 可封裝單個運算符或多個合并的運算符。
- 作業:一個已部署好的用來執行的 Streams 應用程序。它由一個或多個 PE 組成。除了一組 PE 之外,SPL 編譯器還會生成一個 ADL(Application Description Language,應用程序描述語言)文件來描述應用程序的結構。該 ADL 文件包含每個 PE 的詳細信息,比如要加載和執行哪個二進制文件,調度限制、流格式和一個內部運算符數據流圖。
圖 4 演示了 SPL 程序的 InfoSphere Streams 運行時視圖:

圖 4. InfoSphere 運行時執行
一個運算符表示一個可重用的流轉換器,將一些輸入流轉換為輸出流。在 SPL 程序中,運算符調用可實現預算法的特定用途,使用分配的特定的輸入和輸出流,以及在本地指定的參數和邏輯。每次運算符調用都會對輸入和輸出流命名。各種內置的 InfoSphere Streams 運算符提供了許多強大的功能:
Source
:讀取流格式的輸入數據。Sink
:將輸出流的數據寫入外部存儲或系統中。Functor
:過濾、轉換和對輸入流的數據執行各種功能。Sort
:對定義的鍵上的流數據排序。Split
:將輸入流數據拆分為多個輸出流。Join
:合并定義的鍵上的輸入流數據。Aggregate
:聚合定義的鍵上的流數據。Barrier
:組合和匹配流數據。Delay
:演示一個流數據流。Punctor
:識別應一起處理的數據分組。
一個流連接到一個運算符的位置稱為端口。許多運算符(例如 Functor
)有一個輸入端口和一個輸出端口,但運算符也可以沒有輸入端口(比如 Source
)和沒有輸出端口(比如 Sink
),或者擁有多個輸入或輸出端口(比如 Split
和 Join
)。清單 1 給出了 Sink
的一個 SPL 示例,它有一個輸入端口并將輸出元組寫入到一個磁盤文件中。
清單 1. Sink
示例
() as Sink = FileSink(StreamIn) { param file : "/tmp/people.dat"; format : csv; flush : 20u; }
在 清單 1 中,file
是一個強制性參數,提供了輸出文件的路徑。flush
參數用于清除給定數量的元組后的輸出。format
參數指定了輸出文件的格式。
組合運算符是一個運算符集合。它表示對原始(非組合)運算符或組合(嵌套)運算符的一個子圖的一種封裝。它類似于過程語言中的宏。
一個應用程序由一個沒有輸入或輸出端口的主要組合運算符表示。數據可流入和流出,但不會流到一個圖表內的流上,而且流可導出到在同一個實例中運行的其他應用程序和從這些應用程序導入。清單 2 中的代碼給出了主要組合運算符的框架。
清單 2. 主要組合運算符的結構
composite Main { graph stream ... { } stream ... { } ... }
作為一個示例,我們來看一個簡單的流應用程序 WordCount,它統計一個文件中的行數和字數。該程序由以下流圖組成:
- 一個
Source
預算法調用,讀取一個文件并將各行發送給數據流。 - 一個
Functor
運算符調用,統計行數和每個數據行的字數,將統計數據發送給它的輸出流。 - 一個
Counter
運算符調用,聚合文件中所有行的統計數據并打印在末尾。
在介紹 WordCount 的主要組合運算符之前,我將定義一些幫助器。我將為一行的統計數據使用 LineStat
類型。此外,我需要構建一個 countWords(rstring line)
函數來統計一行中的字數,需要使用一個 addM(mutable LineStat x, LineStat y)
函數來添加兩個 LineStat
值并存儲結果。清單 3 定義了這些幫助器。
清單 3. WordCount 幫助器定義
type LineStat = tuple<int32 lines, int32 words>; int32 countWords(rstring line) { return size(tokenize(line, " \t", false)); } void addM(mutable LineStat x, LineStat y) { x.lines += y.lines; x.words += y.words; }
現在可以定義主要組合運算符了,如清單 4 所示。
清單 4. WordCount 的主要組合運算符
composite WordCount { graph stream<rstring line> Data = FileSource() { param file : getSubmissionTimeValue("file"); format : line; } stream<LineStat> OneLine = Functor(Data) { output OneLine : lines = 1, words = countWords(line); } () as Counter = Custom(OneLine) { logic state : mutable LineStat sum = { lines = 0, words = 0 }; onTuple OneLine : addM(sum, OneLine); onPunct OneLine : if (currentPunct() == Sys.FinalMarker) println(sum); } }
開發環境
InfoSphere Streams 提供了一個敏捷開發環境,該環境由 Eclipse IDE、Streams Live Graph 視圖和一個流調試器組成。該平臺還包含用于加速和簡化特定功能或行業的解決方案開發的工具包:
- 標準工具包:包含隨產品發布的默認運算符:
- 關系運算符,比如
Filter
、Sort
、Functor
、Join
、Punctor
和Aggregate
- 適配器 運算符,比如
FileSource
、FileSink
、DirectoryScan
和Export
- 實用程序運算符,比如
Custom Split
、DeDuplicate
、Throttle
、Union
、Delay
、ThreadedSplit
、Barrier
和DynamicFilter
- 關系運算符,比如
- 互聯網工具包:包括
HTTP
、FTP
、HTTPS
、FTPS
和RSS
等運算符。 - 數據庫工具包:支持 DBMS,包括 DB2®、Netezza、Oracle Database、SQL Server 和 MySQL。
- 其他內置工具包:金融、數據挖掘、大數據和文本工具包。
此外,您可定義您自己的工具包,提供可重用的運算符和函數集,并創建跨領域和特定于領域的加速器。它們可包含原始和組合運算符,也可同時使用原生和 SPL 函數。
BigInsights 和 InfoSphere Streams 之間的集成和交互
不斷從系統中生成大量寶貴數據的公司正面臨為以下兩個重要用途而分析數據的問題困擾:及時感知和響應當前事件,根據歷史知識進行預測,從而指導響應。這一情形產生了無縫運行移動數據(當前數據)和靜止數據(歷史數據)分析、處理海量、多樣性、高速產生的數據的需求。IBM 的移動數據 (InfoSphere Streams) 與靜止數據 (BigInsights) 平臺的集成解決了 3 個主要應用場景的需求:
- 可伸縮的數據獲取:通過 Streams 持續將數據獲取到 BigInsights 中。例如,通常需要獲取來自社交媒體來源(比如 Twitter 和 Facebook)的非結構化文本數據,以提取各種類型的態度和線索。在這種情況下,如果文本提取在獲取數據時執行,那么盡早消除垃圾郵件等無關數據會讓效率高很多。這種集成使公司能夠避免巨額的非必要存儲成本。
- 加速和充實:從 BigInsights 生辰歷史上下文來加速分析和充實傳入的 Streams 數據。BigInsights 可用于分析在較長的時間窗口內從各種連續和靜態的數據來源吸收和集成的數據。此分析的結果為各種在線分析提供了上下文,可用于將它們引導至一種已知狀態。回到社交媒體應用程序的場景,一條傳入的 Twitter 消息僅擁有發布該消息的人的 ID。但是,歷史數據可通過屬性(比如影響者)充實該信息,為執行下游分析以適當應對此用戶所表達的態度提供機會。
- 自適應分析模型:BigInsights 上的分析操作(比如數據挖掘、機器學習或統計建模)所生成的模型。這些可用作分析 Streams 上的傳入數據的基礎,基于實時觀察結果而更新。
IBM 大數據平臺的移動數據和靜止數據部分可通過 3 種主要的組件類型來集成:
- 通用分析:相同的分析功能可用在 Streams 和 BigInsights 上。
- 通用數據格式:Streams 格式運算符可在 Streams 元組格式和 BigInsights 使用的數據格式之間轉換數據。
- 數據交換適配器:Streams
Source
和Sink
適配器可用于與 BigInsights 交換數據。
結束語
幫助公司管理、分析和利用大數據是 IBM 大數據平臺的主要關注領域。本文介紹了 InfoSphere Streams,它是 IBM 用來存儲和分析移動數據(流數據)的軟件平臺。本文還概述了如何集成 InfoSphere Streams 與 BigInsights,它們是 IBM 用來存儲和分析靜止數據的軟件平臺,以便充實實現更復雜分析的能力。許多公司認識到,充分利用大數據是提供獨特的業務價值和優勢的一個重要的信息管理手段。如果您已準備好使用 InfoSphere streams,請參見 參考資料,獲取免費的培訓材料和軟件。
詳情請咨詢!
客服熱線:023-66090381