HDFS - Microsoft

10x increase
4.3 connected
VOLUME
VELOCITY
every 5 years
devices per adult
Relational
Data
VARIETY
85% from
new data types
Big Data
Tackling growth in the volume, velocity and variety of data
44% of
users (350M
Mobility
70% of U.S.
are now used an
smartphone
people) access
owners regularly
Facebook via
shop online via
mobile devices.
their devices.
50% of
Gaming consoles
average of
1.5 hrs/wk
to connect to
the Internet.
60% of U.S.
millennials use
33% of BI will
mobile data will
mobile devices
be audio and
to research
video streaming
products.
by 2014.
1 in 4
Mobility
Cloud
Social
Big
Data
be consumed
via handheld
devices
by 2013.
2/3 of the
world's mobile
data traffic will be
video by 2016.
Facebook
38% of
users add
people
to posts
their location
recommend a
brand they “like”
or follow
on a social
network.
500M
Tweets are
hosted on
Twitter each day
1.8 zettabytes
(2B/month).
of digital data
Brands get
100M
worldwide in
Facebook
from 2010.
“likes” per day.
were in use
2011, up 30%
80% growth
of unstructured
data is
predicted over
the next five
years.
機能
RDB
Big Data
データタイプ
構造化データ
非構造化データ
スキーマ
静的- 書き込み時に必要
動的 – 読み込み時
Read write パターン
read/writeの繰り返し
Writeは一回、Readの繰り返し
ストレージボリューム
Gigabytes to terabytes
スケーラビリティ
スケールアップ
Terabytes, petabytes, and
beyond
スケールアウト
エコノミクス
高価なハードウェアとソフト
ウェア
コモディティハードウェアと
オープンソース
(Still) Rapidly Evolving
Oozie
(Workflow)
Hive
(Warehouse
and Data
Access)
Chukwa
Apache
Mahout
Cassandra
Flume
HBase (Column DB)
MapReduce (Job Scheduling/Execution System)
Hadoop = MapReduce + HDFS
HDFS
(Hadoop Distributed File System)
Sqoop
Avro (Serialization)
Zookeeper (Coordination)
Pig (Data
Flow)
Traditional BI Tools
• ビッグデータの分析・レポーティングのための統一ストレージ
• 内部アプリケーションのために複数のデータセットを格納
• クラウドにより信頼性、弾力性、低コストを実現
• ストリームデータまたは非構造化データを既存のデータベースにロード
• ロードする前にクレンジング、転送、バリデーションを実行
• 定期的にデータの可視化またはレポートの作成
• 新しいタイプのデータを検証
• 少人数によるインタラクティブな分析
• レポートの作成や外部または内部データの可視化
• 外部データソースを内部の企業データウェアハウスと統合
• スケジュールされた間隔もしくはオンデマンドでデータ更新
• 外部データによりデータウェアハウスを強化
•
•
•
•
•
•
•
•
•
プラットフォーム: Traditional DWH/BI or HDInsight
ランタイム : HDInsight in the Cloud or on-premises
ストレージ: ASV or HDFS
データ収集 : File upload, StreamInsight, SSIS, Custom App
クエリー: MR, Pig, Hive, UDF or Hadoop Streaming
データ可視化: EXCEL , Sharepoint, LINQ to Hive or Custom app
レポーティング : SSRS, SQL Azure Reporting, Crystal Report etc.
DWH 統合: Sqoop, SSIS, Hive ODBC, PolyBase
他の要素 : ZooKeeper, Oozie, HCatalog, Mahout etc.
• アップローダー
• アップローダーはライブラリーとして実装し、スクリプトやSSISから呼び
出し可能とする
• コマンドラインユーティリティを作成してインタラクティブなアップロー
ドもサポートする
• ASVへのアップロード
• データを小さいサイズへと分割して同時アップロード
• AzureのBlobサイズ
ボトルネック
チューニングテクニック
大量の入力データによるストレージIO
データソースの圧縮
Mapper出力ステージでスピルアウトするレコードに Mapperからスピルアウトするレコードを減らす
よるストレージ I/O
大量のMapper出力によるネットワーク転送
Mapperの出力を圧縮
Combinerの実装
大量のReducer出力によるストレージIOまたはネット JOBの出力を圧縮
ワーク転送
レプリケーション設定の変更
Hive – クエリー出力の圧縮
間違ったコンフィグレーションによるタスクの不足
Map, ReduceタスクまたはJob slotsの数を増やす
タスクへのメモリアロケーション不足
メモリ設定を変更
データ偏在化による特定Reducerの負荷増大
偏在化の除去
http://WAG.Codeplex.com
http://www.windowsAzure.com
http://hadoop.apache.org
http://pnp.azurewebsites.net/en-us/
// Map function - runs on all nodes
var map = function (key, value, context) {
// split the data into an array of words
var hashtags = value.split(/[^0-9a-zA-Z#]/);
};
//Loop through the array, creating a value of 1 for each word beginning "#"
for (var i = 0; i < hashtags.length; i++) {
if (hashtags[i].substring(0, 1) == "#") {
context.write(hashtags[i].toLowerCase(), 1);
}
}
//Reduce function - runs on reducer node(s)
var reduce = function (key, values, context) {
var sum = 0;
// Sum the counts of each tag found in the map function
while (values.hasNext()) {
sum += parseInt(values.next());
}
context.write(key, sum);
};
-- load tweets
Tweets = LOAD 'asv://uploads/data' AS (date, id, author, tweet);
-- split tweet into words
TweetWords = FOREACH Tweets GENERATE date, FLATTEN(TOKENIZE(tweet)) AS tag, id;
--filter words to find hashtags
Tags = FILTER TweetWords BY tag matches '#.*';
-- clean tags by removing trailing periods
CleanTags = FOREACH Tags GENERATE date, LOWER(REPLACE(tag, '\\.', '')) as tag, id;
-- group tweets by date and tag
GroupedTweets = GROUP CleanTags BY (date, tag);
-- count tag mentions per group
CountedTagMentions = FOREACH GroupedTweets GENERATE group, COUNT(CleanTags.id) as
mentions;
-- flatten the group to generate columns
TagMentions = FOREACH CountedTagMentions GENERATE FLATTEN(group) as (date, tag),
mentions;
-- load the top tags found by map/reduce previously
TopTags = LOAD 'asv://results/countedtags/part-r-00000' AS (toptag, totalcount:long);
-- Join tweets and top tags based on matching tag
TagMentionsAndTopTags = JOIN TagMentions BY tag, TopTags BY toptag;
-- get the date, tag, totalcount, and mentions columns
TagMentionsAndTotals = FOREACH TagMentionsAndTopTags GENERATE date, tag, totalcount,
mentions;
-- sort by date and mentions
SortedTagMentionsAndTotals = ORDER TagMentionsAndTotals BY date, mentions;
-- store the results as a file
STORE SortedTagMentionsAndTotals INTO 'asv://results/dailytagcounts';
CREATE EXTERNAL TABLE dailytwittertags
(tweetdate STRING,
tag STRING,
totalcount INT,
daycount INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE LOCATION 'asv://tables/dailytagcount'