什么是Apache Pig?

Apache Pig是MapReduce的一个抽象。它是一个工具/平台,用于分析较大的数据集,并将它们表示为数据流。Pig通常与 Hadoop 一起使用;我们可以使用Apache Pig在Hadoop中执行所有的数据处理操作。

要编写数据分析程序,Pig提供了一种称为 Pig Latin 的高级语言。该语言提供了各种操作符,程序员可以利用它们开发自己的用于读取,写入和处理数据的功能。

要使用 Apache Pig 分析数据,程序员需要使用Pig Latin语言编写脚本。所有这些脚本都在内部转换为Map和Reduce任务。Apache Pig有一个名为 Pig Engine 的组件,它接受Pig Latin脚本作为输入,并将这些脚本转换为MapReduce作业。

为什么我们需要Apache Pig?

不太擅长Java的程序员通常习惯于使用Hadoop,特别是在执行任一MapReduce作业时。Apache Pig是所有这样的程序员的福音。

  • 使用 Pig Latin ,程序员可以轻松地执行MapReduce作业,而无需在Java中键入复杂的代码。

  • Apache Pig使用多查询方法,从而减少代码长度。例如,需要在Java中输入200行代码(LoC)的操作在Apache Pig中输入少到10个LoC就能轻松完成。最终,Apache Pig将开发时间减少了近16倍。

  • Pig Latin是类似SQL的语言,当你熟悉SQL后,很容易学习Apache Pig。

  • Apache Pig提供了许多内置操作符来支持数据操作,如join,filter,ordering等。此外,它还提供嵌套数据类型,例如tuple(元组),bag(包)和MapReduce缺少的map(映射)。

Apache Pig的特点

Apache Pig具有以下特点:

  • 丰富的运算符集 - 它提供了许多运算符来执行诸如join,sort,filer等操作。

  • 易于编程 - Pig Latin与SQL类似,如果你善于使用SQL,则很容易编写Pig脚本。

  • 优化机会 - Apache Pig中的任务自动优化其执行,因此程序员只需要关注语言的语义。

  • 可扩展性 - 使用现有的操作符,用户可以开发自己的功能来读取、处理和写入数据。

  • 用户定义函数 - Pig提供了在其他编程语言(如Java)中创建用户定义函数的功能,并且可以调用或嵌入到Pig脚本中。

  • 处理各种数据 - Apache Pig分析各种数据,无论是结构化还是非结构化,它将结果存储在HDFS中。

Apache Pig与MapReduce

下面列出的是Apache Pig和MapReduce之间的主要区别。

Apache PigMapReduce
Apache Pig是一种数据流语言。MapReduce是一种数据处理模式。
它是一种高级语言。MapReduce是低级和刚性的。
在Apache Pig中执行Join操作非常简单。在MapReduce中执行数据集之间的Join操作是非常困难的。
任何具备SQL基础知识的新手程序员都可以方便地使用Apache Pig工作。向Java公开是必须使用MapReduce。
Apache Pig使用多查询方法,从而在很大程度上减少代码的长度。MapReduce将需要几乎20倍的行数来执行相同的任务。
没有必要编译。执行时,每个Apache Pig操作符都在内部转换为MapReduce作业。MapReduce作业具有很长的编译过程。

Apache Pig Vs SQL

下面列出了Apache Pig和SQL之间的主要区别。

PigSQL
Pig Latin是一种程序语言。SQL是一种声明式语言。
在Apache Pig中,模式是可选的。我们可以存储数据而无需设计模式(值存储为$ 01,$ 02等)模式在SQL中是必需的。
Apache Pig中的数据模型是嵌套关系SQL 中使用的数据模型是平面关系
Apache Pig为查询优化提供有限的机会。在SQL中有更多的机会进行查询优化。

除了上面的区别,Apache Pig Latin:

  • 允许在pipeline(流水线)中拆分。
  • 允许开发人员在pipeline中的任何位置存储数据。
  • 声明执行计划。
  • 提供运算符来执行ETL(Extract提取,Transform转换和Load加载)功能。

Apache Pig VS Hive

Apache Pig和Hive都用于创建MapReduce作业。在某些情况下,Hive以与Apache Pig类似的方式在HDFS上运行。在下表中,我们列出了几个重要的点区分Apache Pig与Hive。

Apache PigHive
Apache Pig使用一种名为 Pig Latin 的语言(最初创建于 Yahoo )。Hive使用一种名为 HiveQL 的语言(最初创建于 Facebook )。
Pig Latin是一种数据流语言。HiveQL是一种查询处理语言。
Pig Latin是一个过程语言,它适合流水线范式。HiveQL是一种声明性语言。
Apache Pig可以处理结构化,非结构化和半结构化数据。Hive主要用于结构化数据。

Apache Pig的应用程序

Apache Pig通常被数据科学家用于执行涉及特定处理和快速原型设计的任务。使用Apache Pig:

  • 处理巨大的数据源,如Web日志。
  • 为搜索平台执行数据处理。
  • 处理时间敏感数据的加载。

Apache Pig历史

2006年 时,Apache Pig是作为Yahoo的研究项目开发的,特别是在每个数据集上创建和执行MapReduce作业。 2007 时,Apache Pig是通过Apache孵化器开源的。 2008 时,Apache Pig的第一个版本出来了。 2010 时,Apache Pig获得为Apache顶级项目。

用于使用Pig分析Hadoop中的数据的语言称为 Pig Latin ,是一种高级数据处理语言,它提供了一组丰富的数据类型和操作符来对数据执行各种操作。

要执行特定任务时,程序员使用Pig,需要用Pig Latin语言编写Pig脚本,并使用任何执行机制(Grunt Shell,UDFs,Embedded)执行它们。执行后,这些脚本将通过应用Pig框架的一系列转换来生成所需的输出。

在内部,Apache Pig将这些脚本转换为一系列MapReduce作业,因此,它使程序员的工作变得容易。Apache Pig的架构如下所示。

Apache Pig架构

Apache Pig组件

如图所示,Apache Pig框架中有各种组件。让我们来看看主要的组件。

Parser(解析器)

最初,Pig脚本由解析器处理,它检查脚本的语法,类型检查和其他杂项检查。解析器的输出将是DAG(有向无环图),它表示Pig Latin语句和逻辑运算符。在DAG中,脚本的逻辑运算符表示为节点,数据流表示为边。

Optimizer(优化器)

逻辑计划(DAG)传递到逻辑优化器,逻辑优化器执行逻辑优化,例如投影和下推。

Compiler(编译器)

编译器将优化的逻辑计划编译为一系列MapReduce作业。

Execution engine(执行引擎)

最后,MapReduce作业以排序顺序提交到Hadoop。这些MapReduce作业在Hadoop上执行,产生所需的结果。

Pig Latin数据模型

Pig Latin的数据模型是完全嵌套的,它允许复杂的非原子数据类型,例如 map tuple 下面给出了Pig Latin数据模型的图形表示。

Pig Latin数据模型

Atom(原子)

Pig Latin中的任何单个值,无论其数据类型,都称为 Atom 。它存储为字符串,可以用作字符串和数字。int,long,float,double,chararray和bytearray是Pig的原子值。一条数据或一个简单的原子值被称为字段:“raja“或“30"

Tuple(元组)

由有序字段集合形成的记录称为元组,字段可以是任何类型。元组与RDBMS表中的行类似。例:(Raja,30)

Bag(包)

一个包是一组无序的元组。换句话说,元组(非唯一)的集合被称为包。每个元组可以有任意数量的字段(灵活模式)。包由“{}"表示。它类似于RDBMS中的表,但是与RDBMS中的表不同,不需要每个元组包含相同数量的字段,或者相同位置(列)中的字段具有相同类型。

:{(Raja,30),(Mohammad,45)}

包可以是关系中的字段;在这种情况下,它被称为内包(inner bag)

:{Raja,30, {9848022338,raja@gmail.com,} }

Map(映射)

映射(或数据映射)是一组key-value对。key需要是chararray类型,且应该是唯一的。value可以是任何类型,它由“[]"表示,

:[name#Raja,age#30]

Relation(关系)

一个关系是一个元组的包。Pig Latin中的关系是无序的(不能保证按任何特定顺序处理元组)。

本章将介绍如何在系统中下载,安装和设置 Apache Pig

先决条件

在你运行Apache Pig之前,必须在系统上安装好Hadoop和Java。因此,在安装Apache Pig之前,请按照以下链接中提供的步骤安装Hadoop和Java://www.51coolma.cn/hadoop/hadoop_enviornment_setup.htm

下载Apache Pig

首先,从以下网站下载最新版本的Apache Pig:https://pig.apache.org/

步骤1

打开Apache Pig网站的主页。News部分下,点击链接release page,如下面的快照所示。

Home Page

步骤2

点击指定的链接后,你将被重定向到 Apache Pig Releases 页面。在此页面的Download部分下,单击链接,然后你将被重定向到具有一组镜像的页面。

Apache Pig Releases

步骤3

选择并单击这些镜像中的任一个,如下所示。

click mirrors

步骤4

这些镜像将带您进入 Pig Releases 页面。 此页面包含Apache Pig的各种版本。 单击其中的最新版本。

Pig Release

步骤5

在这些文件夹中,有发行版中的Apache Pig的源文件和二进制文件。下载Apache Pig 0.16, pig0.16.0-src.tar.gz pig-0.16.0.tar.gz 的源和二进制文件的tar文件。

Pig Index

安装Apache Pig

下载Apache Pig软件后,按照以下步骤将其安装在Linux环境中。

步骤1

在安装了 Hadoop,Java和其他软件的安装目录的同一目录中创建一个名为Pig的目录。(在我们的教程中,我们在名为Hadoop的用户中创建了Pig目录)。

$ mkdir Pig

第2步

提取下载的tar文件,如下所示。

$ cd Downloads/ $ tar zxvf pig-0.15.0-src.tar.gz $ tar zxvf pig-0.15.0.tar.gz 

步骤3

pig-0.16.0-src.tar.gz 文件的内容移动到之前创建的 Pig 目录,如下所示。

$ mv pig-0.16.0-src.tar.gz/* /home/Hadoop/Pig/

配置Apache Pig

安装Apache Pig后,我们必须配置它。要配置,我们需要编辑两个文件 - bashrcpig.properties

.bashrc文件

.bashrc 文件中,设置以下变量

  • PIG_HOME 文件夹复制到Apache Pig的安装文件夹

  • PATH 环境变量复制到bin文件夹

  • PIG_CLASSPATH 环境变量复制到安装Hadoop的etc(配置)文件夹(包含core-site.xml,hdfs-site.xml和mapred-site.xml文件的目录)。

export PIG_HOME = /home/Hadoop/Pigexport PATH  = PATH:/home/Hadoop/pig/binexport PIG_CLASSPATH = $HADOOP_HOME/conf

pig.properties文件

在Pig的 conf 文件夹中,我们有一个名为 pig.properties 的文件。在pig.properties文件中,可以设置如下所示的各种参数。

pig -h properties 

支持以下属性:

Logging: verbose = true|false; default is false. This property is the same as -v       switch brief=true|false; default is false. This property is the same        as -b switch debug=OFF|ERROR|WARN|INFO|DEBUG; default is INFO.                    This property is the same as -d switch aggregate.warning = true|false; default is true.        If true, prints count of warnings of each type rather than logging each warning.		 		 Performance tuning: pig.cachedbag.memusage=<mem fraction>; default is 0.2 (20% of all memory).       Note that this memory is shared across all large bags used by the application.                pig.skewedjoin.reduce.memusagea=<mem fraction>; default is 0.3 (30% of all memory).       Specifies the fraction of heap available for the reducer to perform the join.       pig.exec.nocombiner = true|false; default is false.           Only disable combiner as a temporary workaround for problems.                opt.multiquery = true|false; multiquery is on by default.           Only disable multiquery as a temporary workaround for problems.       opt.fetch=true|false; fetch is on by default.           Scripts containing Filter, Foreach, Limit, Stream, and Union can be dumped without MR jobs.                pig.tmpfilecompression = true|false; compression is off by default.                        Determines whether output of intermediate jobs is compressed.                pig.tmpfilecompression.codec = lzo|gzip; default is gzip.           Used in conjunction with pig.tmpfilecompression. Defines compression type.                pig.noSplitCombination = true|false. Split combination is on by default.           Determines if multiple small files are combined into a single map.         			         pig.exec.mapPartAgg = true|false. Default is false.                        Determines if partial aggregation is done within map phase, before records are sent to combiner.                pig.exec.mapPartAgg.minReduction=<min aggregation factor>. Default is 10.                        If the in-map partial aggregation does not reduce the output num records by this factor, it gets disabled.			  Miscellaneous: exectype = mapreduce|tez|local; default is mapreduce. This property is the same as -x switch       pig.additional.jars.uris=<comma seperated list of jars>. Used in place of register command.       udf.import.list=<comma seperated list of imports>. Used to avoid package names in UDF.       stop.on.failure = true|false; default is false. Set to true to terminate on the first error.                pig.datetime.default.tz=<UTC time offset>. e.g. +08:00. Default is the default timezone of the host.           Determines the timezone used to handle datetime datatype and UDFs.Additionally, any Hadoop property can be specified.

验证安装

通过键入version命令验证Apache Pig的安装。如果安装成功,你将获得Apache Pig的正式版本,如下所示。

$ pig –version  Apache Pig version 0.16.0 (r1682971)  compiled Jun 01 2015, 11:44:35


在上一章中,我们解释了如何安装Apache Pig。在本章中,我们将讨论如何执行Apache Pig。

Apache Pig执行模式

你可以以两种模式运行Apache Pig,即Local(本地)模式HDFS模式

Local模式

在此模式下,所有文件都从本地主机和本地文件系统安装和运行,不需要Hadoop或HDFS。此模式通常用于测试目的。

MapReduce模式

MapReduce模式是我们使用Apache Pig加载或处理Hadoop文件系统(HDFS)中存在的数据的地方。在这种模式下,每当我们执行Pig Latin语句来处理数据时,会在后端调用一个MapReduce作业,以对HDFS中存在的数据执行特定的操作。

Apache Pig执行机制

Apache Pig脚本可以通过三种方式执行,即交互模式,批处理模式和嵌入式模式。

  • 交互模式(Grunt shell) - 你可以使用Grunt shell以交互模式运行Apache Pig。在此shell中,你可以输入Pig Latin语句并获取输出(使用Dump运算符)。

  • 批处理模式(脚本) - 你可以通过将Pig Latin脚本写入具有 .pig 扩展名的单个文件中,以批处理模式运行Apache Pig。

  • 嵌入式模式(UDF) - Apache Pig允许在Java等编程语言中定义我们自己的函数(UDF用户定义函数),并在我们的脚本中使用它们。

调用Grunt Shell

你可以使用“-x"选项以所需的模式(local/MapReduce)调用Grunt shell,如下所示。

Local模式MapReduce模式

Command(命令) -

$ ./pig -x local

Command(命令)-

$ ./pig -x mapreduce

Output(输出) -

Local Mode Output

Output(输出)-

MapReduce Mode Output

这两个命令都给出了Grunt shell提示符,如下所示。

grunt>

你可以使用“ctrl+d"退出Grunt shell。

在调用Grunt shell之后,可以通过直接输入Pig中的Pig Latin语句来执行Pig脚本。

grunt> customers = LOAD 'customers.txt' USING PigStorage(',');

在批处理模式下执行Apache Pig

你可以在文件中编写整个Pig Latin脚本,并使用 -x command 执行它。我们假设在一个名为 sample_script.pig 的文件中有一个Pig脚本,如下所示。

Sample_script.pig

student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING   PigStorage(',') as (id:int,name:chararray,city:chararray);  Dump student;

现在,你可以在上面的文件中执行脚本,如下所示。

Local模式MapReduce模式
$ pig -x local Sample_script.pig $ pig -x mapreduce Sample_script.pig

注意:我们将详细讨论如何在批处理模式嵌入模式中运行Pig脚本。

调用Grunt shell后,可以在shell中运行Pig脚本。除此之外,还有由Grunt shell提供的一些有用的shell和实用程序命令。本章讲解的是Grunt shell提供的shell和实用程序命令。

注意:在本章的某些部分中,使用了LoadStore等命令。请参阅相应章节以获取有关它们的详细信息。

Shell 命令

Apache Pig的Grunt shell主要用于编写Pig Latin脚本。在此之前,我们可以使用 sh fs 来调用任何shell命令。

sh 命令

使用 sh 命令,我们可以从Grunt shell调用任何shell命令,但无法执行作为shell环境( ex - cd)一部分的命令。

语法

下面给出了 sh 命令的语法。

grunt> sh shell command parameters

示例

我们可以使用 sh 选项从Grunt shell中调用Linux shell的 ls 命令,如下所示。在此示例中,它列出了 /pig/bin/ 目录中的文件。

grunt> sh ls   pig pig_1444799121955.log pig.cmd pig.py

fs命令

使用 fs 命令,我们可以从Grunt shell调用任何FsShell命令。

语法

下面给出了 fs 命令的语法。

grunt> sh File System command parameters

示例

我们可以使用fs命令从Grunt shell调用HDFS的ls命令。在以下示例中,它列出了HDFS根目录中的文件。

grunt> fs –ls  Found 3 itemsdrwxrwxrwx   - Hadoop supergroup          0 2015-09-08 14:13 Hbasedrwxr-xr-x   - Hadoop supergroup          0 2015-09-09 14:52 seqgen_datadrwxr-xr-x   - Hadoop supergroup          0 2015-09-08 11:30 twitter_data

以同样的方式,我们可以使用 fs 命令从Grunt shell中调用所有其他文件系统的shell命令。

实用程序命令

Grunt shell提供了一组实用程序命令。这些包括诸如clear,help,history,quitset等实用程序命令;以及Grunt shell中诸如 exec,killrun等命令来控制Pig。下面给出了Grunt shell提供的实用命令的描述。

clear命令

clear 命令用于清除Grunt shell的屏幕。

语法

你可以使用 clear 命令清除grunt shell的屏幕,如下所示。

grunt> clear

help命令

help 命令提供了Pig命令或Pig属性的列表。

使用

你可以使用 help 命令获取Pig命令列表,如下所示。

grunt> helpCommands: <pig latin statement>; - See the PigLatin manual for details:http://hadoop.apache.org/pig  File system commands:fs <fs arguments> - Equivalent to Hadoop dfs  command:http://hadoop.apache.org/common/docs/current/hdfs_shell.html	 Diagnostic Commands:describe <alias>[::<alias] - Show the schema for the alias.Inner aliases can be described as A::B.    explain [-script <pigscript>] [-out <path>] [-brief] [-dot|-xml]        [-param <param_name>=<pCram_value>]       [-param_file <file_name>] [<alias>] -        Show the execution plan to compute the alias or for entire script.       -script - Explain the entire script.       -out - Store the output into directory rather than print to stdout.       -brief - Don't expand nested plans (presenting a smaller graph for overview).       -dot - Generate the output in .dot format. Default is text format.       -xml - Generate the output in .xml format. Default is text format.       -param <param_name - See parameter substitution for details.       -param_file <file_name> - See parameter substitution for details.       alias - Alias to explain.       dump <alias> - Compute the alias and writes the results to stdout.Utility Commands: exec [-param <param_name>=param_value] [-param_file <file_name>] <script> -       Execute the script with access to grunt environment including aliases.       -param <param_name - See parameter substitution for details.       -param_file <file_name> - See parameter substitution for details.       script - Script to be executed.    run [-param <param_name>=param_value] [-param_file <file_name>] <script> -       Execute the script with access to grunt environment.		 -param <param_name - See parameter substitution for details.                -param_file <file_name> - See parameter substitution for details.       script - Script to be executed.    sh  <shell command> - Invoke a shell command.    kill <job_id> - Kill the hadoop job specified by the hadoop job id.    set <key> <value> - Provide execution parameters to Pig. Keys and values are case sensitive.       The following keys are supported:       default_parallel - Script-level reduce parallelism. Basic input size heuristics used        by default.       debug - Set debug on or off. Default is off.       job.name - Single-quoted name for jobs. Default is PigLatin:<script name>            job.priority - Priority for jobs. Values: very_low, low, normal, high, very_high.       Default is normal stream.skippath - String that contains the path.       This is used by streaming any hadoop property.    help - Display this message.    history [-n] - Display the list statements in cache.       -n Hide line numbers.    quit - Quit the grunt shell. 

history命令

此命令显示自Grunt shell被调用以来执行/使用的语句的列表。

使用

假设我们自打开Grunt shell之后执行了三个语句。

grunt> customers = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(','); grunt> orders = LOAD 'hdfs://localhost:9000/pig_data/orders.txt' USING PigStorage(','); grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING PigStorage(','); 

然后,使用 history 命令将产生以下输出。

grunt> historycustomers = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',');   orders = LOAD 'hdfs://localhost:9000/pig_data/orders.txt' USING PigStorage(',');   student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING PigStorage(','); 

set命令

set 命令用于向Pig中使用的key显示/分配值。

使用

使用此命令,可以将值设置到以下key。

Key说明和值
default_parallel通过将任何整数作为值传递给此key来设置映射作业的reducer数。
debug关闭或打开Pig中的调试功能通过传递on/off到这个key。
job.name通过将字符串值传递给此key来将作业名称设置为所需的作业。
job.priority

通过将以下值之一传递给此key来设置作业的优先级:

  • very_low
  • low
  • normal
  • high
  • very_high
stream.skippath对于流式传输,可以通过将所需的路径以字符串形式传递到此key,来设置不传输数据的路径。

quit命令

你可以使用此命令从Grunt shell退出。

使用

从Grunt shell中退出,如下所示。

grunt> quit

现在让我们看看从Grunt shell控制Apache Pig的命令。

exec命令

使用 exec 命令,我们可以从Grunt shell执行Pig脚本。

语法

下面给出了实用程序命令 exec 的语法。

grunt> exec [–param param_name = param_value] [–param_file file_name] [script]

示例

我们假设在HDFS的 /pig_data/ 目录中有一个名为 student.txt 的文件,其中包含以下内容。

Student.txt

001,Rajiv,Hyderabad002,siddarth,Kolkata003,Rajesh,Delhi

并且,假设我们在HDFS的 /pig_data/ 目录中有一个名为 sample_script.pig 的脚本文件,并具有以下内容。

Sample_script.pig

student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING PigStorage(',')    as (id:int,name:chararray,city:chararray);  Dump student;

现在,让我们使用 exec 命令从Grunt shell中执行上面的脚本,如下所示。

grunt> exec /sample_script.pig

输出

exec 命令执行 sample_script.pig 中的脚本。按照脚本中的指示,它会将 student.txt 文件加载到Pig中,并显示Dump操作符的结果,显示以下内容。

(1,Rajiv,Hyderabad)(2,siddarth,Kolkata)(3,Rajesh,Delhi) 

kill命令

你可以使用此命令从Grunt shell中终止一个作业。

语法

下面给出了 kill 命令的语法。

grunt> kill JobId

示例

假设有一个具有id Id_0055 的正在运行的Pig作业,使用 kill 命令从Grunt shell中终止它,如下所示。

grunt> kill Id_0055

run命令

你可以使用run命令从Grunt shell运行Pig脚本

语法

下面给出了 run 命令的语法。

grunt> run [–param param_name = param_value] [–param_file file_name] script

示例

假设在HDFS的 /pig_data/ 目录中有一个名为 student.txt 的文件,其中包含以下内容。

Student.txt

001,Rajiv,Hyderabad002,siddarth,Kolkata003,Rajesh,Delhi

并且,假设我们在本地文件系统中有一个名为 sample_script.pig 的脚本文件,并具有以下内容。

Sample_script.pig

student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING   PigStorage(',') as (id:int,name:chararray,city:chararray);

现在,让我们使用run命令从Grunt shell运行上面的脚本,如下所示。

grunt> run /sample_script.pig

你可以使用Dump操作符查看脚本的输出,如下所示。

grunt> Dump;(1,Rajiv,Hyderabad)(2,siddarth,Kolkata)(3,Rajesh,Delhi)

注意: exec run 命令之间的区别是,如果使用run,则脚本中的语句在history命令中可用。

Pig Latin是用于使用Apache Pig分析Hadoop中数据的语言。在本章中,我们将讨论Pig Latin的基础知识,如Pig Latin语句,数据类型,通用运算符,关系运算符和Pig Latin UDF。

Pig Latin - 数据模型

如前面章节所讨论的,Pig的数据模型是完全嵌套的。Relation是Pig Latin数据模型的最外层结构。它是一个其中:

  • 包是元组的集合。
  • 元组是有序的字段集。
  • 字段是一段数据。

Pig Latin - 语句

在使用Pig Latin处理数据时,语句是基本结构。

  • 这些语句使用关系(relation),它们包括表达式(expression)模式(schema)

  • 每个语句以分号(;)结尾。

  • 我们将使用Pig Latin提供的运算符通过语句执行各种操作。

  • 除了LOAD和STORE,在执行所有其他操作时,Pig Latin语句采用关系作为输入,并产生另一个关系作为输出。

  • 只要在Grunt shell中输入 Load 语句,就会执行语义检查。要查看模式的内容,需要使用 Dump 运算符。只有在执行 dump 操作后,才会执行将数据加载到文件系统的MapReduce作业。

例子

下面给出一个Pig Latin语句,它将数据加载到Apache Pig中。

grunt> Student_data = LOAD 'student_data.txt' USING PigStorage(',')as    ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

Pig Latin - 数据类型

下面给出的表描述了Pig Latin数据类型。

序号数据类型说明&示例
1int

表示有符号的32位整数。

示例:8

2long

表示有符号的64位整数。

示例:5L

3float

表示有符号的32位浮点。

示例:5.5F

4double

表示64位浮点。

示例:10.5

5chararray

表示Unicode UTF-8格式的字符数组(字符串)。

示例:‘51coolma’

6Bytearray

表示字节数组(blob)。

7Boolean

表示布尔值。

示例:true / false。

8Datetime

表示日期时间。

示例:1970-01-01T00:00:00.000 + 00:00

9Biginteger

表示Java BigInteger。

示例:60708090709

10Bigdecimal

表示Java BigDecimal

示例:185.98376256272893883

复杂类型
11Tuple

元组是有序的字段集。

示例:(raja,30)

12Bag

包是元组的集合。

示例:{(raju,30),(Mohhammad,45)}

13Map

地图是一组键值对。

示例:['name'#'Raju','age'#30]

Null值

所有上述数据类型的值可以为NULL。Apache Pig以与SQL类似的方式处理空值。null可以是未知值或不存在值,它用作可选值的占位符。这些空值可以自然出现或者可以是操作的结果。

Pig Latin - 算术运算符

下表描述了Pig Latin的算术运算符。假设a = 10和b = 20。

运算符描述示例
+

 - 运算符的两侧的值相加

a+b将得出30

 - 从运算符左边的数中减去右边的数

a-b将得出-10
*

 - 运算符两侧的值相乘

a*b将得出200
/

- 用运算符左边的数除右边的数

b / a将得出2
%

系数 - 用运算符右边的数除左边的数并返回余数

b%a将得出0
:

Bincond - 评估布尔运算符。它有三个操作数,如下所示。

变量 x =(expression)? value1 (如果为true): value2(如果为false)。

b =(a == 1)? 20:30;

如果a = 1,则b的值为20。

如果a!= 1,则b的值为30。

CASE

WHEN

THEN

ELSE

END

Case - case运算符等效于嵌套的bincond运算符。

CASE f2 % 2

WHEN  0

THEN

'even'


WHEN  1

THEN

'odd'

END

Pig Latin - 比较运算符

下表描述了Pig Latin的比较运算符。

运算符描述示例
==

等于 - 检查两个数的值是否相等;如果是,则条件变为true。

(a = b)不为true。
!=

不等于 - 检查两个数的值是否相等。如果值不相等,则条件为true。

(a!= b)为true。
>

大于 - 检查左边数的值是否大于右边数的值。 如果是,则条件变为true。

(a> b)不为true
<

小于 - 检查左边数的值是否小于右边数的值。 如果是,则条件变为true。

(a<b)为true。
>=

大于或等于 - 检查左边数的值是否大于或等于右边数的值。如果是,则条件变为true。

(a>=b)不为true。
<=

小于或等于 - 检查左边数的值是否小于或等于右边数的值。如果是,则条件变为true。

(a<=b)为true。
matches

模式匹配 - 检查左侧的字符串是否与右侧的常量匹配。

f1 matches '.* tutorial.*'

Pig Latin - 类型结构运算符

下表描述了Pig Latin的类型结构运算符。

运算符描述示例
()

元组构造函数运算符 - 此运算符用于构建元组。

(Raju,30)
{}

包构造函数运算符 - 此运算符用于构造包。

{(Raju,30),(Mohammad,45)}
[]

映射构造函数运算符 - 此运算符用于构造一个映射。

[name#Raja,age#30]

Pig Latin - 关系运算符

下表描述了Pig Latin的关系运算符。

运算符描述
加载和存储
LOAD将数据从文件系统(local/ HDFS)加载到关系中。
STORE将数据从文件系统(local/ HDFS)存储到关系中。
过滤
FILTER从关系中删除不需要的行。
DISTINCT从关系中删除重复行。
FOREACH,GENERATE基于数据列生成数据转换。
STREAM使用外部程序转换关系。
分组和连接
JOIN连接两个或多个关系。
COGROUP将数据分组为两个或多个关系。
GROUP在单个关系中对数据进行分组。
CROSS创建两个或多个关系的向量积。
排序
ORDER基于一个或多个字段(升序或降序)按排序排列关系。
LIMIT从关系中获取有限数量的元组。
合并和拆分
UNION将两个或多个关系合并为单个关系。
SPLIT将单个关系拆分为两个或多个关系。
诊断运算符
DUMP在控制台上打印关系的内容。
DESCRIBE描述关系的模式。
EXPLAIN查看逻辑,物理或MapReduce执行计划以计算关系。
ILLUSTRATE查看一系列语句的分步执行。

一般来说,Apache Pig在Hadoop之上工作。它是一种分析工具,用于分析 Hadoop File System中存在的大型数据集。要使用Apache Pig分析数据,我们必须首先将数据加载到Apache Pig中。本章介绍如何从HDFS将数据加载到Apache Pig。

准备HDFS

在MapReduce模式下,Pig从HDFS读取(加载)数据并将结果存回HDFS。因此,让我们先从HDFS开始,在HDFS中创建以下示例数据。

学生ID名字姓氏电话号码城市
001RajivReddy9848022337Hyderabad
002siddarthBattacharya9848022338Kolkata
003RajeshKhanna9848022339Delhi
004PreethiAgarwal9848022330Pune
005TrupthiMohanthy9848022336Bhuwaneshwar
006ArchanaMishra9848022335Chennai

上述数据集包含六个学生的个人详细信息,如id,名字,姓氏,电话号码和城市。

步骤1:验证Hadoop

首先,使用Hadoop version命令验证安装,如下所示。

$ hadoop version

如果你的系统里有Hadoop,并且已设置PATH变量,那么你将获得以下输出 -

Hadoop 2.6.0 Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 Compiled by jenkins on 2014-11-13T21:10Z Compiled with protoc 2.5.0 From source with checksum 18e43357c8f927c0695f1e9522859d6a This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoopcommon-2.6.0.jar

步骤2:启动HDFS

浏览Hadoop的 sbin 目录,并启动 yarn 和Hadoop dfs(分布式文件系统),如下所示。

cd /$Hadoop_Home/sbin/ $ start-dfs.sh localhost: starting namenode, logging to /home/Hadoop/hadoop/logs/hadoopHadoop-namenode-localhost.localdomain.out localhost: starting datanode, logging to /home/Hadoop/hadoop/logs/hadoopHadoop-datanode-localhost.localdomain.out Starting secondary namenodes [0.0.0.0] starting secondarynamenode, logging to /home/Hadoop/hadoop/logs/hadoop-Hadoopsecondarynamenode-localhost.localdomain.out $ start-yarn.sh starting yarn daemons starting resourcemanager, logging to /home/Hadoop/hadoop/logs/yarn-Hadoopresourcemanager-localhost.localdomain.out localhost: starting nodemanager, logging to /home/Hadoop/hadoop/logs/yarnHadoop-nodemanager-localhost.localdomain.out

步骤3:在HDFS中创建目录

在Hadoop DFS中,可以使用 mkdir 命令创建目录。在HDFS所需路径中创建一个名为 Pig_Data 的新目录,如下所示。

$cd /$Hadoop_Home/bin/ $ hdfs dfs -mkdir hdfs://localhost:9000/Pig_Data 

步骤4:将数据放在HDFS中

Pig的输入文件包含单个行中的每个元组/记录。记录的实体由分隔符分隔(在我们的示例中,我们使用“,”)。在本地文件系统中,创建一个包含数据的输入文件 student_data.txt ,如下所示。

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

现在,使用 put 命令将文件从本地文件系统移动到HDFS,如下所示。(你也可以使用 copyFromLocal 命令。)

$ cd $HADOOP_HOME/bin $ hdfs dfs -put /home/Hadoop/Pig/Pig_Data/student_data.txt dfs://localhost:9000/pig_data/

验证文件

使用 cat 命令验证文件是否已移入HDFS,如下所示。

$ cd $HADOOP_HOME/bin$ hdfs dfs -cat hdfs://localhost:9000/pig_data/student_data.txt

输出

现在,可以看到文件的内容,如下所示。

15/10/01 12:16:55 WARN util.NativeCodeLoader: Unable to load native-hadooplibrary for your platform... using builtin-java classes where applicable  001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai

Load运算符

你可以使用 Pig Latin LOAD 运算符,从文件系统(HDFS / Local)将数据加载到Apache Pig中。

语法

load语句由两部分组成,用“=”运算符分隔。在左侧,需要提到我们想要存储数据的关系的名称;而在右侧,我们需要定义如何存储数据。下面给出了 Load 运算符的语法。

Relation_name = LOAD 'Input file path' USING function as schema;

说明:

  • relation_name - 我们必须提到要存储数据的关系。

  • Input file path - 我们必须提到存储文件的HDFS目录。(在MapReduce模式下)

  • function - 我们必须从Apache Pig提供的一组加载函数中选择一个函数 BinStorage,JsonLoader,PigStorage,TextLoader 

  • Schema - 我们必须定义数据的模式,可以定义所需的模式如下 -

(column1 : data type, column2 : data type, column3 : data type);

注意:我们加载数据而不指定模式。在这种情况下,列将被寻址为$01,$02,等...(检查)。

例如,我们使用 LOAD 命令,在名为学生的模式下在Pig中的 student_data.txt 加载数据。

启动Pig Grunt Shell

首先,打开Linux终端。在MapReduce模式下启动Pig Grunt shell,如下所示。

$ Pig –x mapreduce

它将启动Pig Grunt shell,如下所示。

15/10/01 12:33:37 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL15/10/01 12:33:37 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE15/10/01 12:33:37 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType2015-10-01 12:33:38,080 [main] INFO  org.apache.pig.Main - Apache Pig version 0.15.0 (r1682971) compiled Jun 01 2015, 11:44:352015-10-01 12:33:38,080 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/Hadoop/pig_1443683018078.log2015-10-01 12:33:38,242 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/Hadoop/.pigbootup not found  2015-10-01 12:33:39,630 [main]INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000 grunt>

执行Load语句

现在,通过在Grunt shell中执行以下Pig Latin语句,将文件 student_data.txt 中的数据加载到Pig中。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt'    USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray,    city:chararray );

以下是对上述说明的描述。

Relation name我们已将数据存储在学生(student)模式中。
Input file path我们从HDFS的/pig_data/目录中的 student_data.txt 文件读取数据。
Storage function我们使用了 PigStorage() 函数,将数据加载并存储为结构化文本文件。它采用分隔符,使用元组的每个实体作为参数分隔。默认情况下,它以“ "作为参数。
schema

我们已经使用以下模式存储了数据。

columnid名字姓氏电话号码城市
datatypeintchar arraychar arraychar arraychar array

注意: Load语句会简单地将数据加载到Pig的指定的关系中。要验证Load语句的执行情况,必须使用Diagnostic运算符,这将在后续的章节中讨论。

在上一章中,我们学习了如何将数据加载到Apache Pig中。你可以使用 store 运算符将加载的数据存储在文件系统中,本章介绍如何使用 Store 运算符在Apache Pig中存储数据。

语法

下面给出了Store语句的语法。

STORE Relation_name INTO ' required_directory_path ' [USING function];

假设我们在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt'    USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray,    city:chararray );

现在,让我们将关系存储在HDFS目录“/pig_Output/"中,如下所示。

grunt> STORE student INTO ' hdfs://localhost:9000/pig_Output/ ' USING PigStorage (',');

输出

执行 store 语句后,将获得以下输出。使用指定的名称创建目录,并将数据存储在其中。

2015-10-05 13:05:05,429 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLau ncher - 100% complete2015-10-05 13:05:05,429 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:   HadoopVersion    PigVersion    UserId    StartedAt             FinishedAt             Features 2.6.0            0.15.0        Hadoop    2015-10-0 13:03:03    2015-10-05 13:05:05    UNKNOWN  Success!  Job Stats (time in seconds): JobId          Maps    Reduces    MaxMapTime    MinMapTime    AvgMapTime    MedianMapTime    job_14459_06    1        0           n/a           n/a           n/a           n/aMaxReduceTime    MinReduceTime    AvgReduceTime    MedianReducetime    Alias    Feature        0                 0                0                0             student  MAP_ONLY OutPut folderhdfs://localhost:9000/pig_Output/  Input(s): Successfully read 0 records from: "hdfs://localhost:9000/pig_data/student_data.txt"  Output(s): Successfully stored 0 records in: "hdfs://localhost:9000/pig_Output"  Counters:Total records written : 0Total bytes written : 0Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0Total records proactively spilled: 0  Job DAG: job_1443519499159_0006  2015-10-05 13:06:06,192 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLau ncher - Success!

验证

你可以如下所示验证存储的数据。

步骤1

首先,使用 ls 命令列出名为 pig_output 的目录中的文件,如下所示。

hdfs dfs -ls 'hdfs://localhost:9000/pig_Output/'Found 2 itemsrw-r--r-   1 Hadoop supergroup          0 2015-10-05 13:03 hdfs://localhost:9000/pig_Output/_SUCCESSrw-r--r-   1 Hadoop supergroup        224 2015-10-05 13:03 hdfs://localhost:9000/pig_Output/part-m-00000

可以观察到在执行 store 语句后创建了两个文件。

步骤2

使用 cat 命令,列出名为 part-m-00000 的文件的内容,如下所示。

$ hdfs dfs -cat 'hdfs://localhost:9000/pig_Output/part-m-00000' 1,Rajiv,Reddy,9848022337,Hyderabad2,siddarth,Battacharya,9848022338,Kolkata3,Rajesh,Khanna,9848022339,Delhi4,Preethi,Agarwal,9848022330,Pune5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar6,Archana,Mishra,9848022335,Chennai 


Load 语句会简单地将数据加载到Apache Pig中的指定关系中。要验证Load语句的执行,必须使用Diagnostic运算符Pig Latin提供四种不同类型的诊断运算符:

  • Dump运算符
  • Describe运算符
  • Explanation运算符
  • Illustration运算符

在本章中,我们将讨论Pig Latin的Dump运算符。

Dump运算符

Dump 运算符用于运行Pig Latin语句,并在屏幕上显示结果,它通常用于调试目的。

语法

下面给出了 Dump 运算符的语法。

grunt> Dump Relation_Name

假设在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

我们使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt'    USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray,    city:chararray );

现在,使用Dump运算符打印关系的内容,如下所示。

grunt> Dump student

一旦执行上述 Pig Latin 语句,将启动一个MapReduce作业以从HDFS读取数据,将产生以下输出。

2015-10-01 15:05:27,642 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete2015-10-01 15:05:27,652 [main]INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:   HadoopVersion  PigVersion  UserId    StartedAt             FinishedAt       Features             2.6.0          0.15.0      Hadoop  2015-10-01 15:03:11  2015-10-01 05:27     UNKNOWN                                                Success!  Job Stats (time in seconds):  JobId           job_14459_0004Maps                 1  Reduces              0  MaxMapTime          n/a    MinMapTime          n/aAvgMapTime          n/a MedianMapTime       n/aMaxReduceTime        0MinReduceTime        0  AvgReduceTime        0MedianReducetime     0Alias             student Feature           MAP_ONLY        Outputs           hdfs://localhost:9000/tmp/temp580182027/tmp757878456,Input(s): Successfully read 0 records from: "hdfs://localhost:9000/pig_data/student_data.txt"  Output(s): Successfully stored 0 records in: "hdfs://localhost:9000/tmp/temp580182027/tmp757878456"  Counters: Total records written : 0 Total bytes written : 0 Spillable Memory Manager spill count : 0Total bags proactively spilled: 0 Total records proactively spilled: 0  Job DAG: job_1443519499159_0004  2015-10-01 15:06:28,403 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLau ncher - Success!2015-10-01 15:06:28,441 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.2015-10-01 15:06:28,485 [main]INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 12015-10-01 15:06:28,485 [main]INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input pathsto process : 1(1,Rajiv,Reddy,9848022337,Hyderabad)(2,siddarth,Battacharya,9848022338,Kolkata)(3,Rajesh,Khanna,9848022339,Delhi)(4,Preethi,Agarwal,9848022330,Pune)(5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)(6,Archana,Mishra,9848022335,Chennai)


describe 运算符用于查看关系的模式。

语法

describe 运算符的语法如下

grunt> Describe Relation_name

假设在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt' USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们描述名为student的关系,并验证模式如下所示。

grunt> describe student;

输出

执行上述 Pig Latin 语句后,将生成以下输出。

grunt> student: { id: int,firstname: chararray,lastname: chararray,phone: chararray,city: chararray }


explain 运算符用于显示关系的逻辑,物理和MapReduce执行计划。

语法

下面给出了 explain 运算符的语法。

grunt> explain Relation_name;

假设在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt' USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们使用 explain 运算符解释名为student的关系,如下所示。

grunt> explain student;

输出

它将产生以下输出。

$ explain student;2015-10-05 11:32:43,660 [main]2015-10-05 11:32:43,660 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer -{RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator,GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer,PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}  #-----------------------------------------------# New Logical Plan: #-----------------------------------------------student: (Name: LOStore Schema:id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#35:chararray)| |---student: (Name: LOForEach Schema:id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#35:chararray)    |   |    |   (Name: LOGenerate[false,false,false,false,false] Schema:id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#35:chararray)ColumnPrune:InputUids=[34, 35, 32, 33,31]ColumnPrune:OutputUids=[34, 35, 32, 33, 31]    |   |   |     |   |   (Name: Cast Type: int Uid: 31)     |   |   |     |   |   |---id:(Name: Project Type: bytearray Uid: 31 Input: 0 Column: (*))    |   |   |         |   |   (Name: Cast Type: chararray Uid: 32)    |   |   |     |   |   |---firstname:(Name: Project Type: bytearray Uid: 32 Input: 1Column: (*))    |   |   |    |   |   (Name: Cast Type: chararray Uid: 33)    |   |   |    |   |   |---lastname:(Name: Project Type: bytearray Uid: 33 Input: 2	 Column: (*))    |   |   |     |   |   (Name: Cast Type: chararray Uid: 34)    |   |   |      |   |   |---phone:(Name: Project Type: bytearray Uid: 34 Input: 3 Column:(*))    |   |   |     |   |   (Name: Cast Type: chararray Uid: 35)    |   |   |      |   |   |---city:(Name: Project Type: bytearray Uid: 35 Input: 4 Column:(*))    |   |     |   |---(Name: LOInnerLoad[0] Schema: id#31:bytearray)    |   |      |   |---(Name: LOInnerLoad[1] Schema: firstname#32:bytearray)    |   |    |   |---(Name: LOInnerLoad[2] Schema: lastname#33:bytearray)    |   |    |   |---(Name: LOInnerLoad[3] Schema: phone#34:bytearray)    |   |     |   |---(Name: LOInnerLoad[4] Schema: city#35:bytearray)    |    |---student: (Name: LOLoad Schema: id#31:bytearray,firstname#32:bytearray,lastname#33:bytearray,phone#34:bytearray,city#35:bytearray)RequiredFields:null #-----------------------------------------------# Physical Plan: #-----------------------------------------------student: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36| |---student: New For Each(false,false,false,false,false)[bag] - scope-35    |   |    |   Cast[int] - scope-21    |   |    |   |---Project[bytearray][0] - scope-20    |   |      |   Cast[chararray] - scope-24    |   |    |   |---Project[bytearray][1] - scope-23    |   |     |   Cast[chararray] - scope-27    |   |      |   |---Project[bytearray][2] - scope-26     |   |      |   Cast[chararray] - scope-30     |   |      |   |---Project[bytearray][3] - scope-29    |   |    |   Cast[chararray] - scope-33    |   |     |   |---Project[bytearray][4] - scope-32    |     |---student: Load(hdfs://localhost:9000/pig_data/student_data.txt:PigStorage(',')) - scope192015-10-05 11:32:43,682 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false2015-10-05 11:32:43,684 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOp timizer - MR plan size before optimization: 1 2015-10-05 11:32:43,685 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOp timizer - MR plan size after optimization: 1 #--------------------------------------------------# Map Reduce Plan                                   #--------------------------------------------------MapReduce node scope-37Map Planstudent: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36||---student: New For Each(false,false,false,false,false)[bag] - scope-35    |   |    |   Cast[int] - scope-21     |   |    |   |---Project[bytearray][0] - scope-20    |   |    |   Cast[chararray] - scope-24    |   |    |   |---Project[bytearray][1] - scope-23    |   |    |   Cast[chararray] - scope-27    |   |     |   |---Project[bytearray][2] - scope-26     |   |     |   Cast[chararray] - scope-30     |   |      |   |---Project[bytearray][3] - scope-29     |   |     |   Cast[chararray] - scope-33    |   |     |   |---Project[bytearray][4] - scope-32     |      |---student:Load(hdfs://localhost:9000/pig_data/student_data.txt:PigStorage(',')) - scope19-------- Global sort: false ---------------- 


illustrate 运算符为你提供了一系列语句的逐步执行。

语法

下面给出了illustrate运算符的语法。

grunt> illustrate Relation_name;

假设在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata 003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune 005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt' USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们说明如下所示的名为student的关系。

grunt> illustrate student;

输出

在执行上面的语句时,将获得以下输出。

grunt> illustrate student;INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$M ap - Aliasesbeing processed per job phase (AliasName[line,offset]): M: student[1,10] C:  R:---------------------------------------------------------------------------------------------|student | id:int | firstname:chararray | lastname:chararray | phone:chararray | city:chararray |--------------------------------------------------------------------------------------------- |        | 002    | siddarth            | Battacharya        | 9848022338      | Kolkata        |---------------------------------------------------------------------------------------------


GROUP 运算符用于在一个或多个关系中对数据进行分组,它收集具有相同key的数据。

语法

下面给出了 group 运算符的语法。

grunt> Group_data = GROUP Relation_name BY age;

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi004,Preethi,Agarwal,21,9848022330,Pune005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar006,Archana,Mishra,23,9848022335,Chennai007,Komal,Nayak,24,9848022334,trivendram008,Bharathi,Nambiayar,24,9848022333,Chennai

将这个文件加载到Apache Pig中,关系名称为student_details,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);

现在,让我们按照年龄关系中的记录/元组进行分组,如下所示。

grunt> group_data = GROUP student_details by age;

验证

使用 DUMP 运算符验证关系 group_data ,如下所示。

grunt> Dump group_data;

输出

将获得显示名为group_data关系的内容的输出,如下所示。在这里你可以观察到结果模式有两列:

  • 一个是age,通过它我们将关系分组。

  • 另一个是bag,其中包含一组元组,有各自年龄的学生记录。

(21,{(4,Preethi,Agarwal,21,9848022330,Pune),(1,Rajiv,Reddy,21,9848022337,Hydera bad)})(22,{(3,Rajesh,Khanna,22,9848022339,Delhi),(2,siddarth,Battacharya,22,984802233 8,Kolkata)})(23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336 ,Bhuwaneshwar)})(24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334, trivendram)})

在使用 describe 命令分组数据后,可以看到表的模式,如下所示。

grunt> Describe group_data;  group_data: {group: int,student_details: {(id: int,firstname: chararray,               lastname: chararray,age: int,phone: chararray,city: chararray)}}

以同样的方式,可以使用illustrate命令获取模式的示例说明,如下所示。

$ Illustrate group_data;

它将产生以下输出

------------------------------------------------------------------------------------------------- |group_data|  group:int | student_details:bag{:tuple(id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray)}|------------------------------------------------------------------------------------------------- |          |     21     | { 4, Preethi, Agarwal, 21, 9848022330, Pune), (1, Rajiv, Reddy, 21, 9848022337, Hyderabad)}| |          |     2      | {(2,siddarth,Battacharya,22,9848022338,Kolkata),(003,Rajesh,Khanna,22,9848022339,Delhi)}| -------------------------------------------------------------------------------------------------

按多列分组

让我们按年龄和城市对关系进行分组,如下所示。

grunt> group_multiple = GROUP student_details by (age, city);

可以使用Dump运算符验证名为 group_multiple 的关系的内容,如下所示。

grunt> Dump group_multiple;   ((21,Pune),{(4,Preethi,Agarwal,21,9848022330,Pune)})((21,Hyderabad),{(1,Rajiv,Reddy,21,9848022337,Hyderabad)})((22,Delhi),{(3,Rajesh,Khanna,22,9848022339,Delhi)})((22,Kolkata),{(2,siddarth,Battacharya,22,9848022338,Kolkata)})((23,Chennai),{(6,Archana,Mishra,23,9848022335,Chennai)})((23,Bhuwaneshwar),{(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)})((24,Chennai),{(8,Bharathi,Nambiayar,24,9848022333,Chennai)})(24,trivendram),{(7,Komal,Nayak,24,9848022334,trivendram)})

Group All

你可以按所有的列对关系进行分组,如下所示。

grunt> group_all = GROUP student_details All;

现在,请验证关系 group_all 的内容,如下所示。

grunt> Dump group_all;    (all,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334 ,trivendram), (6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336,Bhuw aneshwar), (4,Preethi,Agarwal,21,9848022330,Pune),(3,Rajesh,Khanna,22,9848022339,Delhi), (2,siddarth,Battacharya,22,9848022338,Kolkata),(1,Rajiv,Reddy,21,9848022337,Hyd erabad)})


COGROUP 运算符的运作方式与 GROUP 运算符相同。两个运算符之间的唯一区别是 group 运算符通常用于一个关系,而 cogroup 运算符用于涉及两个或多个关系的语句。

使用Cogroup分组两个关系

假设在HDFS目录 /pig_data/ 中有两个文件,即 student_details.txt employee_details.txt ,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi004,Preethi,Agarwal,21,9848022330,Pune005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar006,Archana,Mishra,23,9848022335,Chennai007,Komal,Nayak,24,9848022334,trivendram008,Bharathi,Nambiayar,24,9848022333,Chennai

employee_details.txt

001,Robin,22,newyork 002,BOB,23,Kolkata 003,Maya,23,Tokyo 004,Sara,25,London 005,David,23,Bhuwaneshwar 006,Maggy,22,Chennai

将这些文件分别加载到Pig中,关系名称分别为 student_details employee_details ,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);   grunt> employee_details = LOAD 'hdfs://localhost:9000/pig_data/employee_details.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, city:chararray);

现在,将 student_details employee_details 关系的记录/元组按关键字age进行分组,如下所示。

grunt> cogroup_data = COGROUP student_details by age, employee_details by age;

验证

使用 DUMP 运算符验证关系 cogroup_data ,如下所示。

grunt> Dump cogroup_data;

输出

它将产生以下输出,显示名为 cogroup_data 的关系的内容,如下所示。

(21,{(4,Preethi,Agarwal,21,9848022330,Pune), (1,Rajiv,Reddy,21,9848022337,Hyderabad)},    {    })  (22,{ (3,Rajesh,Khanna,22,9848022339,Delhi), (2,siddarth,Battacharya,22,9848022338,Kolkata) },     { (6,Maggy,22,Chennai),(1,Robin,22,newyork) })  (23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336 ,Bhuwaneshwar)},    {(5,David,23,Bhuwaneshwar),(3,Maya,23,Tokyo),(2,BOB,23,Kolkata)}) (24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334, trivendram)},    { })  (25,{   },    {(4,Sara,25,London)})

cogroup 运算符根据年龄对来自每个关系的元组进行分组,其中每个组描述特定的年龄值。

例如,如果我们考虑结果的第一个元组,它按照年龄21分组,那它包含两个包

  • 第一个包保存了具有21岁的第一关系(在这种情况下是 student_details )的所有元组;

  • 第二个包具有第二关系(在这种情况下为 employee_details )的所有元组,其年龄为21岁。

如果关系不具有年龄值为21的元组,则返回一个空包。

JOIN 运算符用于组合来自两个或多个关系的记录。在执行连接操作时,我们从每个关系中声明一个(或一组)元组作为key。 当这些key匹配时,两个特定的元组匹配,否则记录将被丢弃。连接可以是以下类型:

  • Self-join
  • Inner-join
  • Outer-join − left join, right join, and full join

本章介绍了如何在Pig Latin中使用join运算符的示例。假设在HDFS的 /pig_data/ 目录中有两个文件,即 customers.txt orders.txt ,如下所示。

customers.txt

1,Ramesh,32,Ahmedabad,2000.002,Khilan,25,Delhi,1500.003,kaushik,23,Kota,2000.004,Chaitali,25,Mumbai,6500.00 5,Hardik,27,Bhopal,8500.006,Komal,22,MP,4500.007,Muffy,24,Indore,10000.00

orders.txt

102,2009-10-08 00:00:00,3,3000100,2009-10-08 00:00:00,3,1500101,2009-11-20 00:00:00,2,1560103,2008-05-20 00:00:00,4,2060

我们将这两个文件 customers 和 orders 关系一起加载到Pig中,如下所示。

grunt> customers = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, address:chararray, salary:int);  grunt> orders = LOAD 'hdfs://localhost:9000/pig_data/orders.txt' USING PigStorage(',')   as (oid:int, date:chararray, customer_id:int, amount:int);

现在让我们对这两个关系执行各种连接操作。

Self-join(自连接)

Self-join 用于将表与其自身连接,就像表是两个关系一样,临时重命名至少一个关系。通常,在Apache Pig中,为了执行self-join,我们将在不同的别名(名称)下多次加载相同的数据。那么,将文件 customers.txt 的内容加载为两个表,如下所示。

grunt> customers1 = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, address:chararray, salary:int);  grunt> customers2 = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, address:chararray, salary:int); 

语法

下面给出使用 JOIN 运算符执行self-join操作的语法。

grunt> Relation3_name = JOIN Relation1_name BY key, Relation2_name BY key ;

通过如图所示加入两个关系 customers1 customers2 ,对关系 customers 执行self-join 操作

grunt> customers3 = JOIN customers1 BY id, customers2 BY id;

验证

使用 DUMP 运算符验证关系 customers3 ,如下所示。

grunt> Dump customers3;

输出

将产生以下输出,显示关系 customers 的内容。

(1,Ramesh,32,Ahmedabad,2000,1,Ramesh,32,Ahmedabad,2000)(2,Khilan,25,Delhi,1500,2,Khilan,25,Delhi,1500)(3,kaushik,23,Kota,2000,3,kaushik,23,Kota,2000)(4,Chaitali,25,Mumbai,6500,4,Chaitali,25,Mumbai,6500)(5,Hardik,27,Bhopal,8500,5,Hardik,27,Bhopal,8500)(6,Komal,22,MP,4500,6,Komal,22,MP,4500)(7,Muffy,24,Indore,10000,7,Muffy,24,Indore,10000)

Inner Join(内部连接)

Inner Join使用较为频繁;它也被称为等值连接当两个表中都存在匹配时,内部连接将返回行。基于连接谓词(join-predicate),通过组合两个关系(例如A和B)的列值来创建新关系。查询将A的每一行与B的每一行进行比较,以查找满足连接谓词的所有行对。当连接谓词被满足时,A和B的每个匹配的行对的列值被组合成结果行。

语法

以下是使用 JOIN 运算符执行inner join操作的语法。

grunt> result = JOIN relation1 BY columnname, relation2 BY columnname;

让我们对customersorders执行inner join操作,如下所示。

grunt> coustomer_orders = JOIN customers BY id, orders BY customer_id;

验证

使用 DUMP 运算符验证 coustomer_orders 关系,如下所示。

grunt> Dump coustomer_orders;

输出

将获得以下输出,是名为 coustomer_orders 的关系的内容。

(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)

注意

Outer Join:与inner join不同,outer join返回至少一个关系中的所有行。outer join操作以三种方式执行:

  • Left outer join
  • Right outer join
  • Full outer join

Left Outer Join(左外连接)

left outer join操作返回左表中的所有行,即使右边的关系中没有匹配项

语法

下面给出使用 JOIN 运算符执行left outer join操作的语法。

grunt> Relation3_name = JOIN Relation1_name BY id LEFT OUTER, Relation2_name BY customer_id;

让我们对customers和orders的两个关系执行left outer join操作,如下所示。

grunt> outer_left = JOIN customers BY id LEFT OUTER, orders BY customer_id;

验证

使用 DUMP 运算符验证关系 outer_left ,如下所示。

grunt> Dump outer_left;

输出

它将产生以下输出,显示关系 outer_left 的内容。

(1,Ramesh,32,Ahmedabad,2000,,,,)(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)(5,Hardik,27,Bhopal,8500,,,,)(6,Komal,22,MP,4500,,,,)(7,Muffy,24,Indore,10000,,,,) 

Right Outer Join(右外连接)

right outer join操作将返回右表中的所有行,即使左表中没有匹配项。

语法

下面给出使用 JOIN 运算符执行right outer join操作的语法。

grunt> outer_right = JOIN customers BY id RIGHT, orders BY customer_id;

让我们对customersorders执行right outer join操作,如下所示。

grunt> outer_right = JOIN customers BY id RIGHT, orders BY customer_id;

验证

使用 DUMP 运算符验证关系 outer_right ,如下所示。

grunt> Dump outer_right

输出

它将产生以下输出,显示关系 outer_right 的内容。

(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)

Full Outer Join(全外连接)

当一个关系中存在匹配时,full outer join操作将返回行。

语法

下面给出使用 JOIN 运算符执行full outer join的语法。

grunt> outer_full = JOIN customers BY id FULL OUTER, orders BY customer_id;

让我们对customersorders执行full outer join操作,如下所示。

grunt> outer_full = JOIN customers BY id FULL OUTER, orders BY customer_id;

验证

使用 DUMP 运算符验证关系 outer_full ,如下所示。

grun> Dump outer_full; 

输出

它将产生以下输出,显示关系 outer_full 的内容。

(1,Ramesh,32,Ahmedabad,2000,,,,)(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)(5,Hardik,27,Bhopal,8500,,,,)(6,Komal,22,MP,4500,,,,)(7,Muffy,24,Indore,10000,,,,)

使用多个Key

我们可以使用多个key执行JOIN操作。

语法

下面是如何使用多个key对两个表执行JOIN操作。

grunt> Relation3_name = JOIN Relation2_name BY (key1, key2), Relation3_name BY (key1, key2);

假设在HDFS的 /pig_data/ 目录中有两个文件,即 employee.txt employee_contact.txt ,如下所示。

employee.txt

001,Rajiv,Reddy,21,programmer,003002,siddarth,Battacharya,22,programmer,003003,Rajesh,Khanna,22,programmer,003004,Preethi,Agarwal,21,programmer,003005,Trupthi,Mohanthy,23,programmer,003006,Archana,Mishra,23,programmer,003007,Komal,Nayak,24,teamlead,002008,Bharathi,Nambiayar,24,manager,001

employee_contact.txt

001,9848022337,Rajiv@gmail.com,Hyderabad,003002,9848022338,siddarth@gmail.com,Kolkata,003003,9848022339,Rajesh@gmail.com,Delhi,003004,9848022330,Preethi@gmail.com,Pune,003005,9848022336,Trupthi@gmail.com,Bhuwaneshwar,003006,9848022335,Archana@gmail.com,Chennai,003007,9848022334,Komal@gmail.com,trivendram,002008,9848022333,Bharathi@gmail.com,Chennai,001

将这两个文件加载到Pig中,通过关系 employee employee_contact ,如下所示。

grunt> employee = LOAD 'hdfs://localhost:9000/pig_data/employee.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, designation:chararray, jobid:int);  grunt> employee_contact = LOAD 'hdfs://localhost:9000/pig_data/employee_contact.txt' USING PigStorage(',')    as (id:int, phone:chararray, email:chararray, city:chararray, jobid:int);

现在,让我们使用 JOIN 运算符连接这两个关系的内容,如下所示。

grunt> emp = JOIN employee BY (id,jobid), employee_contact BY (id,jobid);

验证

使用 DUMP 运算符验证关系 emp ,如下所示。

grunt> Dump emp; 

输出

它将产生以下输出,显示名为 emp 的关系的内容,如下所示。

(1,Rajiv,Reddy,21,programmer,113,1,9848022337,Rajiv@gmail.com,Hyderabad,113)(2,siddarth,Battacharya,22,programmer,113,2,9848022338,siddarth@gmail.com,Kolka ta,113)  (3,Rajesh,Khanna,22,programmer,113,3,9848022339,Rajesh@gmail.com,Delhi,113)  (4,Preethi,Agarwal,21,programmer,113,4,9848022330,Preethi@gmail.com,Pune,113)  (5,Trupthi,Mohanthy,23,programmer,113,5,9848022336,Trupthi@gmail.com,Bhuwaneshw ar,113)  (6,Archana,Mishra,23,programmer,113,6,9848022335,Archana@gmail.com,Chennai,113)  (7,Komal,Nayak,24,teamlead,112,7,9848022334,Komal@gmail.com,trivendram,112)  (8,Bharathi,Nambiayar,24,manager,111,8,9848022333,Bharathi@gmail.com,Chennai,111)


CROSS 运算符计算两个或多个关系的向量积。本章将以示例说明如何在Pig Latin中使用cross运算符。

语法

下面给出了 CROSS 运算符的语法。

grunt> Relation3_name = CROSS Relation1_name, Relation2_name;

假设在HDFS的 /pig_data/ 目录中有两个文件,即 customers.txt orders.txt ,如下所示。

customers.txt

1,Ramesh,32,Ahmedabad,2000.002,Khilan,25,Delhi,1500.003,kaushik,23,Kota,2000.004,Chaitali,25,Mumbai,6500.005,Hardik,27,Bhopal,8500.006,Komal,22,MP,4500.007,Muffy,24,Indore,10000.00

orders.txt

102,2009-10-08 00:00:00,3,3000100,2009-10-08 00:00:00,3,1500101,2009-11-20 00:00:00,2,1560103,2008-05-20 00:00:00,4,2060

将这两个文件加载到Pig中,通过关系 customers  orders,如下所示。

grunt> customers = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, address:chararray, salary:int);  grunt> orders = LOAD 'hdfs://localhost:9000/pig_data/orders.txt' USING PigStorage(',')   as (oid:int, date:chararray, customer_id:int, amount:int);

现在让我们使用 cross 运算符获得这两个关系的向量积,如下所示。

grunt> cross_data = CROSS customers, orders;

验证

使用 DUMP 运算符验证关系 cross_data ,如下所示。

grunt> Dump cross_data;

输出

它将产生以下输出,显示关系 cross_data 的内容。

(7,Muffy,24,Indore,10000,103,2008-05-20 00:00:00,4,2060) (7,Muffy,24,Indore,10000,101,2009-11-20 00:00:00,2,1560) (7,Muffy,24,Indore,10000,100,2009-10-08 00:00:00,3,1500) (7,Muffy,24,Indore,10000,102,2009-10-08 00:00:00,3,3000) (6,Komal,22,MP,4500,103,2008-05-20 00:00:00,4,2060) (6,Komal,22,MP,4500,101,2009-11-20 00:00:00,2,1560) (6,Komal,22,MP,4500,100,2009-10-08 00:00:00,3,1500) (6,Komal,22,MP,4500,102,2009-10-08 00:00:00,3,3000) (5,Hardik,27,Bhopal,8500,103,2008-05-20 00:00:00,4,2060) (5,Hardik,27,Bhopal,8500,101,2009-11-20 00:00:00,2,1560) (5,Hardik,27,Bhopal,8500,100,2009-10-08 00:00:00,3,1500) (5,Hardik,27,Bhopal,8500,102,2009-10-08 00:00:00,3,3000) (4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060) (4,Chaitali,25,Mumbai,6500,101,2009-20 00:00:00,4,2060) (2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560) (2,Khilan,25,Delhi,1500,100,2009-10-08 00:00:00,3,1500) (2,Khilan,25,Delhi,1500,102,2009-10-08 00:00:00,3,3000) (1,Ramesh,32,Ahmedabad,2000,103,2008-05-20 00:00:00,4,2060) (1,Ramesh,32,Ahmedabad,2000,101,2009-11-20 00:00:00,2,1560) (1,Ramesh,32,Ahmedabad,2000,100,2009-10-08 00:00:00,3,1500) (1,Ramesh,32,Ahmedabad,2000,102,2009-10-08 00:00:00,3,3000)-11-20 00:00:00,2,1560) (4,Chaitali,25,Mumbai,6500,100,2009-10-08 00:00:00,3,1500) (4,Chaitali,25,Mumbai,6500,102,2009-10-08 00:00:00,3,3000) (3,kaushik,23,Kota,2000,103,2008-05-20 00:00:00,4,2060) (3,kaushik,23,Kota,2000,101,2009-11-20 00:00:00,2,1560) (3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500) (3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000) (2,Khilan,25,Delhi,1500,103,2008-05-20 00:00:00,4,2060) (2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560) (2,Khilan,25,Delhi,1500,100,2009-10-08 00:00:00,3,1500)(2,Khilan,25,Delhi,1500,102,2009-10-08 00:00:00,3,3000) (1,Ramesh,32,Ahmedabad,2000,103,2008-05-20 00:00:00,4,2060) (1,Ramesh,32,Ahmedabad,2000,101,2009-11-20 00:00:00,2,1560) (1,Ramesh,32,Ahmedabad,2000,100,2009-10-08 00:00:00,3,1500) (1,Ramesh,32,Ahmedabad,2000,102,2009-10-08 00:00:00,3,3000)  


Pig Latin的 UNION 运算符用于合并两个关系的内容。要对两个关系执行UNION操作,它们的列和域必须相同。

语法

下面给出了 UNION 运算符的语法。

grunt> Relation_name3 = UNION Relation_name1, Relation_name2;

假设在HDFS的 /pig_data/ 目录中有两个文件,即 student_data1.txt student_data2.txt ,如下所示。

Student_data1.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

Student_data2.txt

7,Komal,Nayak,9848022334,trivendram.8,Bharathi,Nambiayar,9848022333,Chennai.

将这两个文件加载到Pig中,通过关系 student1 student2 ,如下所示。

grunt> student1 = LOAD 'hdfs://localhost:9000/pig_data/student_data1.txt' USING PigStorage(',')    as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);  grunt> student2 = LOAD 'hdfs://localhost:9000/pig_data/student_data2.txt' USING PigStorage(',')    as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);

现在,让我们使用 UNION 运算符合并这两个关系的内容,如下所示。

grunt> student = UNION student1, student2;

验证

使用 DUMP 运算子验证关系student,如下所示。

grunt> Dump student; 

输出

它将显示以下输出,显示关系student的内容。

(1,Rajiv,Reddy,9848022337,Hyderabad) (2,siddarth,Battacharya,9848022338,Kolkata)(3,Rajesh,Khanna,9848022339,Delhi)(4,Preethi,Agarwal,9848022330,Pune) (5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)(6,Archana,Mishra,9848022335,Chennai) (7,Komal,Nayak,9848022334,trivendram) (8,Bharathi,Nambiayar,9848022333,Chennai)


SPLIT 运算符用于将关系拆分为两个或多个关系。

语法

下面给出了 SPLIT 运算符的语法。

grunt> SPLIT Relation1_name INTO Relation2_name IF (condition1), Relation2_name (condition2),

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

通过关系 student_details 将此文件加载到Pig中,如下所示。

student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray); 

现在,让我们将关系分为两个,一个列出年龄小于23岁的员工,另一个列出年龄在22到25岁之间的员工。

SPLIT student_details into student_details1 if age<23, student_details2 if (22<age and age>25);

验证

使用 DUMP 操作符验证关系 student_details1 student_details2 ,如下所示。

grunt> Dump student_details1;  grunt> Dump student_details2; 

输出

它将产生以下输出,分别显示关系 student_details1 student_details2 的内容。

grunt> Dump student_details1; (1,Rajiv,Reddy,21,9848022337,Hyderabad) (2,siddarth,Battacharya,22,9848022338,Kolkata)(3,Rajesh,Khanna,22,9848022339,Delhi) (4,Preethi,Agarwal,21,9848022330,Pune)  grunt> Dump student_details2; (5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar) (6,Archana,Mishra,23,9848022335,Chennai) (7,Komal,Nayak,24,9848022334,trivendram) (8,Bharathi,Nambiayar,24,9848022333,Chennai)


FILTER 运算符用于根据条件从关系中选择所需的元组。

语法

下面给出了 FILTER 运算符的语法。

grunt> Relation2_name = FILTER Relation1_name BY (condition);

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

将此文件通过关系 student_details 加载到 的Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);

现在使用Filter运算符来获取属于Chennai城市的学生的详细信息。

filter_data = FILTER student_details BY city == 'Chennai';

验证

使用 DUMP 运算符验证关系 filter_data ,如下所示。

grunt> Dump filter_data;

输出

它将产生以下输出,显示关系 filter_data 的内容如下。

(6,Archana,Mishra,23,9848022335,Chennai)(8,Bharathi,Nambiayar,24,9848022333,Chennai)


DISTINCT 运算符用于从关系中删除冗余(重复)元组。

语法

下面给出了 DISTINCT 运算符的语法。

grunt> Relation_name2 = DISTINCT Relatin_name1;

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata 002,siddarth,Battacharya,9848022338,Kolkata 003,Rajesh,Khanna,9848022339,Delhi 003,Rajesh,Khanna,9848022339,Delhi 004,Preethi,Agarwal,9848022330,Pune 005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai 006,Archana,Mishra,9848022335,Chennai

通过关系 student_details 将此文件加载到Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')    as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);

现在,让我们使用 DISTINCT 运算符从 student_details 关系中删除冗余(重复)元组,并将其另存在一个名为 distinct_data 的关系 如下所示。

grunt> distinct_data = DISTINCT student_details;

验证

使用 DUMP 运算符验证关系 distinct_data ,如下所示。

grunt> Dump distinct_data;

输出

它将产生以下输出,显示关系 distinct_data 的内容如下。

(1,Rajiv,Reddy,9848022337,Hyderabad)(2,siddarth,Battacharya,9848022338,Kolkata) (3,Rajesh,Khanna,9848022339,Delhi) (4,Preethi,Agarwal,9848022330,Pune) (5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)(6,Archana,Mishra,9848022335,Chennai)


FOREACH 运算符用于基于列数据生成指定的数据转换。

语法

下面给出了 FOREACH 运算符的语法。

grunt> Relation_name2 = FOREACH Relatin_name1 GENERATE (required data);

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

通过关系 student_details 将此文件加载到的Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在让我们从关系 student_details 中获取每个学生的id,age和city值,并使用 foreach 运算符将它存储到另一个名为 foreach_data  关系,如下所示。

grunt> foreach_data = FOREACH student_details GENERATE id,age,city;

验证

使用 DUMP 运算符验证关系 foreach_data ,如下所示。

grunt> Dump foreach_data;

输出

它将产生以下输出,显示关系 foreach_data 的内容。

(1,21,Hyderabad)(2,22,Kolkata)(3,22,Delhi)(4,21,Pune) (5,23,Bhuwaneshwar)(6,23,Chennai) (7,24,trivendram)(8,24,Chennai) 


ORDER BY 运算符用于以基于一个或多个字段的排序顺序显示关系的内容。

语法

下面给出了 ORDER BY 运算符的语法。

grunt> Relation_name2 = ORDER Relatin_name1 BY (ASC|DESC);

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

通过关系 student_details 将此文件加载到的Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在让我们根据学生的年龄以降序排列关系,并使用 ORDER BY 运算符将它存储到另一个名为 order_by_data 的关系中,如下所示。

grunt> order_by_data = ORDER student_details BY age DESC;

验证

使用 DUMP 运算符验证关系 order_by_data ,如下所示。

grunt> Dump order_by_data; 

输出

它将产生以下输出,显示关系 order_by_data 的内容。

(8,Bharathi,Nambiayar,24,9848022333,Chennai)(7,Komal,Nayak,24,9848022334,trivendram)(6,Archana,Mishra,23,9848022335,Chennai) (5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)(3,Rajesh,Khanna,22,9848022339,Delhi) (2,siddarth,Battacharya,22,9848022338,Kolkata)(4,Preethi,Agarwal,21,9848022330,Pune) (1,Rajiv,Reddy,21,9848022337,Hyderabad)


LIMIT 运算符用于从关系中获取有限数量的元组。

语法

下面给出了 LIMIT 运算符的语法。

grunt> Result = LIMIT Relation_name required number of tuples;

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

通过关系 student_details 将此文件加载到的Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在,让我们根据学生的年龄以降序对关系进行排序,并使用 ORDER BY 运算符将其存储到另一个名为 limit_data 的关系中,如下所示。

grunt> limit_data = LIMIT student_details 4; 

验证

使用 DUMP 运算符验证关系 limit_data ,如下所示。

grunt> Dump limit_data; 

输出

它将产生以下输出,显示关系 limit_data 的内容如下。

(1,Rajiv,Reddy,21,9848022337,Hyderabad) (2,siddarth,Battacharya,22,9848022338,Kolkata) (3,Rajesh,Khanna,22,9848022339,Delhi) (4,Preethi,Agarwal,21,9848022330,Pune) 


Apache Pig提供了各种内置函数,即 eval,load,store,math,string,bag tuple 函数。

Eval函数

下面给出了Apache Pig提供的 eval 函数列表。

S.N.函数 & 描述
1AVG()

计算包内数值的平均值。

2BagToString()

将包的元素连接成字符串。在连接时,我们可以在这些值之间放置分隔符(可选)。

3CONCAT()

连接两个或多个相同类型的表达式。

4COUNT()

获取包中元素的数量,同时计算包中元组的数量。

5COUNT_STAR()

它类似于 COUNT() 函数。 它用于获取包中的元素数量。

6DIFF()

比较元组中的两个包(字段)。

7IsEmpty()

检查包或映射是否为空。

8MAX()

计算单列包中的列(数值或字符)的最大值。

9MIN()

要获取单列包中特定列的最小(最低)值(数字或字符)。

10PluckTuple()

使用Pig Latin的  PluckTuple() 函数,可以定义字符串Prefix,并过滤以给定prefix开头的关系中的列。

11SIZE()

基于任何Pig数据类型计算元素的数量。

12SUBTRACT()

两个包相减, 它需要两个包作为输入,并返回包含第一个包中不在第二个包中的元组的包。

13SUM()

要获取单列包中某列的数值总和。

14TOKENIZE()

要在单个元组中拆分字符串(其中包含一组字),并返回包含拆分操作的输出的包。


Apache Pig中的加载存储函数用于确定数据如何从Pig中弹出。这些函数与加载和存储运算符一起使用。下面给出了Pig中可用的加载和存储函数的列表。

S.N.函数 & 描述
1PigStorage()

加载和存储结构化文件。

2TextLoader()

将非结构化数据加载到Pig中。

3BinStorage()

使用机器可读格式将数据加载并存储到Pig中。

4Handling Compression

在Pig Latin中,我们可以加载和存储压缩数据。


下面给出了Bag和Tuple函数的列表。

S.N.函数 & 描述
1TOBAG()

将两个或多个表达式转换为包。

2TOP()

获取关系的顶部 N 个元组。

3TOTUPLE()

将一个或多个表达式转换为元组。

4TOMAP()

将key-value对转换为Map。


在Apache Pig中有以下String函数。

S.N.函数 & 描述
1ENDSWITH(string, testAgainst)

验证给定字符串是否以特定子字符串结尾。

2STARTSWITH(string, substring)

接受两个字符串参数,并验证第一个字符串是否以第二个字符串开头。

3SUBSTRING(string, startIndex, stopIndex)

返回来自给定字符串的子字符串。

4EqualsIgnoreCase(string1, string2)

比较两个字符串,忽略大小写。

5INDEXOF(string, ‘character’, startIndex)

返回字符串中第一个出现的字符,从开始索引向前搜索。

6LAST_INDEX_OF(expression)

返回字符串中最后一次出现的字符的索引,从开始索引向后搜索。

7LCFIRST(expression)

将字符串中的第一个字符转换为小写。

8UCFIRST(expression)

返回一个字符串,其中第一个字符转换为大写。

9UPPER(expression)

返回转换为大写的字符串。

10LOWER(expression)

将字符串中的所有字符转换为小写。

11REPLACE(string, ‘oldChar’, ‘newChar’);

使用新字符替换字符串中的现有字符。

12STRSPLIT(string, regex, limit)

围绕给定正则表达式的匹配拆分字符串。

13STRSPLITTOBAG(string, regex, limit)

STRSPLIT() 函数类似,它通过给定的分隔符将字符串拆分,并将结果返回到包中。

14TRIM(expression)

返回删除了前端和尾部空格的字符串的副本。

15LTRIM(expression)

返回删除了前端空格的字符串的副本。

16RTRIM(expression)

返回已删除尾部空格的字符串的副本。


Apache Pig提供以下日期和时间函数 -

S.N.函数 & 描述
1ToDate(milliseconds)

此函数根据给定的参数返回日期时间对象。此函数的另一个替代方法是ToDate(iosstring),ToDate(userstring,format),ToDate(userstring,format,timezone)

2CurrentTime()

返回当前时间的日期时间对象。

3GetDay(datetime)

从日期时间对象返回一个月中的某一天。

4GetHour(datetime)

从日期时间对象返回一天中的小时。

5GetMilliSecond(datetime)

从日期时间对象返回秒中的毫秒。

6GetMinute(datetime)

从日期时间对象返回一小时中的分钟。

7GetMonth(datetime)

从日期时间对象返回一年中的月份。

8GetSecond(datetime)

从日期时间对象返回一分钟的秒。

9GetWeek(datetime)

从日期时间对象返回一年中的周。

10GetWeekYear(datetime)

从日期时间对象返回周年。

11GetYear(datetime)

从日期时间对象返回年份。

12AddDuration(datetime, duration)

返回日期时间对象的结果以及持续时间对象。

13SubtractDuration(datetime, duration)

从Date-Time对象中减去Duration对象并返回结果。

14DaysBetween(datetime1, datetime2)

返回两个日期时间对象之间的天数。

15HoursBetween(datetime1, datetime2)

返回两个日期时间对象之间的小时数。

16MilliSecondsBetween(datetime1, datetime2)

返回两个日期时间对象之间的毫秒数。

17MinutesBetween(datetime1, datetime2)

返回两个日期时间对象之间的分钟数。

18MonthsBetween(datetime1, datetime2)

返回两个日期时间对象之间的月数。

19SecondsBetween(datetime1, datetime2)

返回两个日期时间对象之间的秒数。

20WeeksBetween(datetime1, datetime2)

返回两个日期时间对象之间的周数。

21YearsBetween(datetime1, datetime2)

返回两个日期时间对象之间的年数。


我们在Apache Pig中有以下Math(数学)函数:

S.N.函数 & 描述
1ABS(expression)

获取表达式的绝对值。

2ACOS(expression)

获得表达式的反余弦值。

3ASIN(expression)

获取表达式的反正弦值。

4ATAN(expression)

此函数用于获取表达式的反正切值。

5CBRT(expression)

此函数用于获取表达式的立方根。

6CEIL(expression)

此函数用于获取向上舍入到最接近的整数的表达式的值(近1取整)。

7COS(expression)

此函数用于获取表达式的三角余弦值。

8COSH(expression)

此函数用于获取表达式的双曲余弦值。

9EXP(expression)

此函数用于获得欧拉数e乘以x的幂,即指数。

10FLOOR(expression)

要获得向下取整为最接近整数的表达式的值(四舍五入取整)。

11LOG(expression)

获得表达式的自然对数(基于e)。

12LOG10(expression)

得到表达式的基于10的对数。

13RANDOM( )

获得大于或等于0.0且小于1.0的伪随机数(double类型)。

14ROUND(expression)

要将表达式的值四舍五入为整数(如果结果类型为float)或四舍五入为长整型(如果结果类型为double)。

15SIN(expression)

获得表达式的正弦值。

16SINH(expression)

获得表达式的双曲正弦值。

17SQRT(expression)

获得表达式的正平方根。

18TAN(expression)

获得角度的三角正切。

19TANH(expression)

获得表达式的双曲正切。


除了内置函数之外,Apache Pig还为 User Defined Function(UDF:用户定义函数)提供广泛的支持。使用这些UDF,可以定义我们自己的函数并使用它们。UDF支持六种编程语言,即Java,Jython,Python,JavaScript,Ruby和Groovy。

对于编写UDF,在Java中提供全面的支持,并在所有其他语言中提供有限的支持。使用Java,你可以编写涉及处理的所有部分的UDF,如数据加载/存储,列转换和聚合。由于Apache Pig是用Java编写的,因此与其他语言相比,使用Java语言编写的UDF工作效率更高。

在Apache Pig中,我们还有一个用于UDF名为 Piggybank 的Java存储库。使用Piggybank,我们可以访问由其他用户编写的Java UDF,并贡献我们自己的UDF。

Java中的UDF的类型

在使用Java编写UDF时,我们可以创建和使用以下三种类型的函数

  • Filter函数 - Filter(过滤)函数用作过滤器语句中的条件。这些函数接受Pig值作为输入并返回布尔值。

  • Eval函数 - Eval函数在FOREACH-GENERATE语句中使用。这些函数接受Pig值作为输入并返回Pig结果。

  • Algebraic函数 - Algebraic(代数)函数对FOREACHGENERATE语句中的内包起作用。这些函数用于对内包执行完全MapReduce操作。

使用Java编写UDF

要使用Java编写UDF,我们必须集成jar文件 Pig-0.15.0.jar 在本章节中,将讨论如何使用Eclipse编写示例UDF。在继续学习前,请确保你已在系统中安装了Eclipse和Maven。

按照下面给出的步骤写一个UDF函数:

  • 打开Eclipse并创建一个新项目(例如 myproject )。

  • 将新创建的项目转换为Maven项目。

  • 在pom.xml中复制以下内容。此文件包含Apache Pig和Hadoop-core jar文件的Maven依赖关系。

<project xmlns = "http://maven.apache.org/POM/4.0.0"   xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0http://maven.apache .org/xsd/maven-4.0.0.xsd"> 	   <modelVersion>4.0.0</modelVersion>    <groupId>Pig_Udf</groupId>    <artifactId>Pig_Udf</artifactId>    <version>0.0.1-SNAPSHOT</version>	   <build>          <sourceDirectory>src</sourceDirectory>          <plugins>               <plugin>                    <artifactId>maven-compiler-plugin</artifactId>                    <version>3.3</version>                    <configuration>                         <source>1.7</source>                         <target>1.7</target>                    </configuration>               </plugin>          </plugins>     </build>	   <dependencies> 	      <dependency>                     <groupId>org.apache.pig</groupId>                     <artifactId>pig</artifactId>                     <version>0.15.0</version>           </dependency> 		      <dependency>                 <groupId>org.apache.hadoop</groupId>                     <artifactId>hadoop-core</artifactId>                     <version>0.20.2</version>           </dependency>          </dependencies>  	</project>
  • 保存文件并刷新它。 Maven依赖关系部分中,可以找到下载的jar文件。

  • 创建名为 Sample_Eval 的新的类文件,并在其中复制以下内容。

import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple;  import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple;public class Sample_Eval extends EvalFunc<String>{    public String exec(Tuple input) throws IOException {         if (input == null || input.size() == 0)            return null;            String str = (String)input.get(0);            return str.toUpperCase();     } }

在编写UDF时,必须继承EvalFunc类并向 exec() 函数提供实现。在此函数中,写入UDF所需的代码。在上面的例子中,我们返回了将给定列的内容转换为大写的代码。

  • 编译完类并确认没有错误后,右键单击Sample_Eval.java文件。它将呈现一个菜单。选择“export”,如以下屏幕截图所示。

选择export
  • 点击“export”,将看到以下窗口。 点击“JAR file”

点击Export
  • 点击“Next>”按钮继续操作。将获得另一个窗口,你需要在本地文件系统中输入路径,在其中存储jar文件。

jar export
  • 最后,单击“Finish”按钮。在指定的文件夹中,创建一个Jar文件 sample_udf.jar 此jar文件包含用Java编写的UDF。

使用UDF

在编写UDF和生成Jar文件后,请按照下面给出的步骤:

步骤1:注册Jar文件

在写入UDF(在Java中)后,我们必须使用Register运算符注册包含UDF的Jar文件。通过注册Jar文件,用户可以将UDF的位置绑定到Apache Pig。

语法

下面给出了Register运算符的语法。

REGISTER path; 

让我们注册本章前面创建的sample_udf.jar。以本地模式启动Apache Pig并注册jar文件sample_udf.jar,如下所示。

$cd PIG_HOME/bin $./pig –x local REGISTER '/$PIG_HOME/sample_udf.jar'

注意:假设路径中的Jar文件:/$PIG_HOME/sample_udf.jar

步骤2:定义别名

注册UDF后,可以使用 Define 运算符为其定义一个别名。

语法

下面给出了Define运算符的语法。

DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] }; 

定义sample_eval的别名,如下所示。

DEFINE sample_eval sample_eval();

步骤3:使用UDF

定义别名后,可以使用与内置函数相同的UDF。假设在HDFS /Pig_Data/ 目录中有一个名为emp_data的文件,其中包含以下内容。

001,Robin,22,newyork002,BOB,23,Kolkata003,Maya,23,Tokyo004,Sara,25,London 005,David,23,Bhuwaneshwar 006,Maggy,22,Chennai007,Robert,22,newyork008,Syam,23,Kolkata009,Mary,25,Tokyo010,Saran,25,London 011,Stacy,25,Bhuwaneshwar 012,Kelly,22,Chennai

并假设我们已将此文件加载到Pig中,如下所示。

grunt> emp_data = LOAD 'hdfs://localhost:9000/pig_data/emp1.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, city:chararray);

现在使用UDF sample_eval 将员工的姓名转换为大写。

grunt> Upper_case = FOREACH emp_data GENERATE sample_eval(name);

请验证关系 Upper_case 的内容,如下所示。

grunt> Dump Upper_case;  (ROBIN)(BOB)(MAYA)(SARA)(DAVID)(MAGGY)(ROBERT)(SYAM)(MARY)(SARAN)(STACY)(KELLY)


在本章中,我们将了解如何以批处理模式运行Apache Pig脚本。

Pig脚本中的注释

在将脚本写入文件时,我们可以在其中包含注释,如下所示。

多行注释

我们将用'/*'开始多行注释,以'*/'结束。

/* These are the multi-line comments   In the pig script */ 

单行注释

我们将用“--"开始单行注释。

--we can write single line comments like this.

在批处理模式下执行Pig脚本

在以批处理方式执行Apache Pig语句时,请按照以下步骤操作。

步骤1

将所有需要的Pig Latin语句写在单个文件中。我们可以将所有Pig Latin语句和命令写入单个文件,并将其另存为 .pig 文件。

步骤2

执行Apache Pig脚本。你可以从shell(Linux)执行Pig脚本,如下所示。

Local模式MapReduce模式

$ pig -x local Sample_script.pig

$ pig -x mapreduce Sample_script.pig

你可以使用exec命令从Grunt shell执行它,如下所示。

grunt> exec /sample_script.pig

从HDFS执行Pig脚本

我们还可以执行驻留在HDFS中的Pig脚本。假设在名为 /pig_data/ 的HDFS目录中有名为 Sample_script.pig 的Pig脚本。我们可以执行它如下所示。

$ pig -x mapreduce hdfs://localhost:9000/pig_data/Sample_script.pig 

假设在HDFS中有一个具有以下内容的文件 student_details.txt

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad 002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

我们还在同一个HDFS目录中有一个名为 sample_script.pig 的示例脚本。此文件包含对student关系执行操作和转换的语句,如下所示。

student = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);	student_order = ORDER student BY age DESC;  student_limit = LIMIT student_order 4;  Dump student_limit;
  • 脚本的第一个语句会将名为 student_details.txt 的文件中的数据加载为名为student的关系。

  • 脚本的第二个语句将根据年龄以降序排列关系的元组,并将其存储为 student_order

  • 脚本的第三个语句会将 student_order 的前4个元组存储为 student_limit

  • 最后,第四个语句将转储关系 student_limit 的内容。

现在,执行 sample_script.pig ,如下所示。

$./pig -x mapreduce hdfs://localhost:9000/pig_data/sample_script.pig

Apache Pig被执行,并提供具有以下内容的输出。

(7,Komal,Nayak,24,9848022334,trivendram)(8,Bharathi,Nambiayar,24,9848022333,Chennai) (5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar) (6,Archana,Mishra,23,9848022335,Chennai)2015-10-19 10:31:27,446 [main] INFO  org.apache.pig.Main - Pig script completed in 12minutes, 32 seconds and 751 milliseconds (752751 ms)


以下资源包含有关Apache Pig的其他信息。请使用它们获得有关此主题的更深入的知识。

Apache Pig上的有用链接

有用的Apache Pig书籍

  • Programming Pig
  • Hadoop For Dummies
  • Pig & Sqoop Refresher

Apache Pig是MapReduce的一个抽象。它是一个工具/平台,用于分析较大的数据集,将它们表示为数据流。Pig通常与 Hadoop 一起使用;我们可以使用Pig在Hadoop中执行所有的数据操作操作。

什么是Apache Pig?

Apache Pig是MapReduce的一个抽象。它是一个工具/平台,用于分析较大的数据集,并将它们表示为数据流。Pig通常与 Hadoop 一起使用;我们可以使用Apache Pig在Hadoop中执行所有的数据处理操作。

要编写数据分析程序,Pig提供了一种称为 Pig Latin 的高级语言。该语言提供了各种操作符,程序员可以利用它们开发自己的用于读取,写入和处理数据的功能。

要使用 Apache Pig 分析数据,程序员需要使用Pig Latin语言编写脚本。所有这些脚本都在内部转换为Map和Reduce任务。Apache Pig有一个名为 Pig Engine 的组件,它接受Pig Latin脚本作为输入,并将这些脚本转换为MapReduce作业。

为什么我们需要Apache Pig?

不太擅长Java的程序员通常习惯于使用Hadoop,特别是在执行任一MapReduce作业时。Apache Pig是所有这样的程序员的福音。

  • 使用 Pig Latin ,程序员可以轻松地执行MapReduce作业,而无需在Java中键入复杂的代码。

  • Apache Pig使用多查询方法,从而减少代码长度。例如,需要在Java中输入200行代码(LoC)的操作在Apache Pig中输入少到10个LoC就能轻松完成。最终,Apache Pig将开发时间减少了近16倍。

  • Pig Latin是类似SQL的语言,当你熟悉SQL后,很容易学习Apache Pig。

  • Apache Pig提供了许多内置操作符来支持数据操作,如join,filter,ordering等。此外,它还提供嵌套数据类型,例如tuple(元组),bag(包)和MapReduce缺少的map(映射)。

Apache Pig的特点

Apache Pig具有以下特点:

  • 丰富的运算符集 - 它提供了许多运算符来执行诸如join,sort,filer等操作。

  • 易于编程 - Pig Latin与SQL类似,如果你善于使用SQL,则很容易编写Pig脚本。

  • 优化机会 - Apache Pig中的任务自动优化其执行,因此程序员只需要关注语言的语义。

  • 可扩展性 - 使用现有的操作符,用户可以开发自己的功能来读取、处理和写入数据。

  • 用户定义函数 - Pig提供了在其他编程语言(如Java)中创建用户定义函数的功能,并且可以调用或嵌入到Pig脚本中。

  • 处理各种数据 - Apache Pig分析各种数据,无论是结构化还是非结构化,它将结果存储在HDFS中。

Apache Pig与MapReduce

下面列出的是Apache Pig和MapReduce之间的主要区别。

Apache PigMapReduce
Apache Pig是一种数据流语言。MapReduce是一种数据处理模式。
它是一种高级语言。MapReduce是低级和刚性的。
在Apache Pig中执行Join操作非常简单。在MapReduce中执行数据集之间的Join操作是非常困难的。
任何具备SQL基础知识的新手程序员都可以方便地使用Apache Pig工作。向Java公开是必须使用MapReduce。
Apache Pig使用多查询方法,从而在很大程度上减少代码的长度。MapReduce将需要几乎20倍的行数来执行相同的任务。
没有必要编译。执行时,每个Apache Pig操作符都在内部转换为MapReduce作业。MapReduce作业具有很长的编译过程。

Apache Pig Vs SQL

下面列出了Apache Pig和SQL之间的主要区别。

PigSQL
Pig Latin是一种程序语言。SQL是一种声明式语言。
在Apache Pig中,模式是可选的。我们可以存储数据而无需设计模式(值存储为$ 01,$ 02等)模式在SQL中是必需的。
Apache Pig中的数据模型是嵌套关系SQL 中使用的数据模型是平面关系
Apache Pig为查询优化提供有限的机会。在SQL中有更多的机会进行查询优化。

除了上面的区别,Apache Pig Latin:

  • 允许在pipeline(流水线)中拆分。
  • 允许开发人员在pipeline中的任何位置存储数据。
  • 声明执行计划。
  • 提供运算符来执行ETL(Extract提取,Transform转换和Load加载)功能。

Apache Pig VS Hive

Apache Pig和Hive都用于创建MapReduce作业。在某些情况下,Hive以与Apache Pig类似的方式在HDFS上运行。在下表中,我们列出了几个重要的点区分Apache Pig与Hive。

Apache PigHive
Apache Pig使用一种名为 Pig Latin 的语言(最初创建于 Yahoo )。Hive使用一种名为 HiveQL 的语言(最初创建于 Facebook )。
Pig Latin是一种数据流语言。HiveQL是一种查询处理语言。
Pig Latin是一个过程语言,它适合流水线范式。HiveQL是一种声明性语言。
Apache Pig可以处理结构化,非结构化和半结构化数据。Hive主要用于结构化数据。

Apache Pig的应用程序

Apache Pig通常被数据科学家用于执行涉及特定处理和快速原型设计的任务。使用Apache Pig:

  • 处理巨大的数据源,如Web日志。
  • 为搜索平台执行数据处理。
  • 处理时间敏感数据的加载。

Apache Pig历史

2006年 时,Apache Pig是作为Yahoo的研究项目开发的,特别是在每个数据集上创建和执行MapReduce作业。 2007 时,Apache Pig是通过Apache孵化器开源的。 2008 时,Apache Pig的第一个版本出来了。 2010 时,Apache Pig获得为Apache顶级项目。

用于使用Pig分析Hadoop中的数据的语言称为 Pig Latin ,是一种高级数据处理语言,它提供了一组丰富的数据类型和操作符来对数据执行各种操作。

要执行特定任务时,程序员使用Pig,需要用Pig Latin语言编写Pig脚本,并使用任何执行机制(Grunt Shell,UDFs,Embedded)执行它们。执行后,这些脚本将通过应用Pig框架的一系列转换来生成所需的输出。

在内部,Apache Pig将这些脚本转换为一系列MapReduce作业,因此,它使程序员的工作变得容易。Apache Pig的架构如下所示。

Apache Pig架构

Apache Pig组件

如图所示,Apache Pig框架中有各种组件。让我们来看看主要的组件。

Parser(解析器)

最初,Pig脚本由解析器处理,它检查脚本的语法,类型检查和其他杂项检查。解析器的输出将是DAG(有向无环图),它表示Pig Latin语句和逻辑运算符。在DAG中,脚本的逻辑运算符表示为节点,数据流表示为边。

Optimizer(优化器)

逻辑计划(DAG)传递到逻辑优化器,逻辑优化器执行逻辑优化,例如投影和下推。

Compiler(编译器)

编译器将优化的逻辑计划编译为一系列MapReduce作业。

Execution engine(执行引擎)

最后,MapReduce作业以排序顺序提交到Hadoop。这些MapReduce作业在Hadoop上执行,产生所需的结果。

Pig Latin数据模型

Pig Latin的数据模型是完全嵌套的,它允许复杂的非原子数据类型,例如 map tuple 下面给出了Pig Latin数据模型的图形表示。

Pig Latin数据模型

Atom(原子)

Pig Latin中的任何单个值,无论其数据类型,都称为 Atom 。它存储为字符串,可以用作字符串和数字。int,long,float,double,chararray和bytearray是Pig的原子值。一条数据或一个简单的原子值被称为字段:“raja“或“30"

Tuple(元组)

由有序字段集合形成的记录称为元组,字段可以是任何类型。元组与RDBMS表中的行类似。例:(Raja,30)

Bag(包)

一个包是一组无序的元组。换句话说,元组(非唯一)的集合被称为包。每个元组可以有任意数量的字段(灵活模式)。包由“{}"表示。它类似于RDBMS中的表,但是与RDBMS中的表不同,不需要每个元组包含相同数量的字段,或者相同位置(列)中的字段具有相同类型。

:{(Raja,30),(Mohammad,45)}

包可以是关系中的字段;在这种情况下,它被称为内包(inner bag)

:{Raja,30, {9848022338,raja@gmail.com,} }

Map(映射)

映射(或数据映射)是一组key-value对。key需要是chararray类型,且应该是唯一的。value可以是任何类型,它由“[]"表示,

:[name#Raja,age#30]

Relation(关系)

一个关系是一个元组的包。Pig Latin中的关系是无序的(不能保证按任何特定顺序处理元组)。

本章将介绍如何在系统中下载,安装和设置 Apache Pig

先决条件

在你运行Apache Pig之前,必须在系统上安装好Hadoop和Java。因此,在安装Apache Pig之前,请按照以下链接中提供的步骤安装Hadoop和Java://www.51coolma.cn/hadoop/hadoop_enviornment_setup.htm

下载Apache Pig

首先,从以下网站下载最新版本的Apache Pig:https://pig.apache.org/

步骤1

打开Apache Pig网站的主页。News部分下,点击链接release page,如下面的快照所示。

Home Page

步骤2

点击指定的链接后,你将被重定向到 Apache Pig Releases 页面。在此页面的Download部分下,单击链接,然后你将被重定向到具有一组镜像的页面。

Apache Pig Releases

步骤3

选择并单击这些镜像中的任一个,如下所示。

click mirrors

步骤4

这些镜像将带您进入 Pig Releases 页面。 此页面包含Apache Pig的各种版本。 单击其中的最新版本。

Pig Release

步骤5

在这些文件夹中,有发行版中的Apache Pig的源文件和二进制文件。下载Apache Pig 0.16, pig0.16.0-src.tar.gz pig-0.16.0.tar.gz 的源和二进制文件的tar文件。

Pig Index

安装Apache Pig

下载Apache Pig软件后,按照以下步骤将其安装在Linux环境中。

步骤1

在安装了 Hadoop,Java和其他软件的安装目录的同一目录中创建一个名为Pig的目录。(在我们的教程中,我们在名为Hadoop的用户中创建了Pig目录)。

$ mkdir Pig

第2步

提取下载的tar文件,如下所示。

$ cd Downloads/ $ tar zxvf pig-0.15.0-src.tar.gz $ tar zxvf pig-0.15.0.tar.gz 

步骤3

pig-0.16.0-src.tar.gz 文件的内容移动到之前创建的 Pig 目录,如下所示。

$ mv pig-0.16.0-src.tar.gz/* /home/Hadoop/Pig/

配置Apache Pig

安装Apache Pig后,我们必须配置它。要配置,我们需要编辑两个文件 - bashrcpig.properties

.bashrc文件

.bashrc 文件中,设置以下变量

  • PIG_HOME 文件夹复制到Apache Pig的安装文件夹

  • PATH 环境变量复制到bin文件夹

  • PIG_CLASSPATH 环境变量复制到安装Hadoop的etc(配置)文件夹(包含core-site.xml,hdfs-site.xml和mapred-site.xml文件的目录)。

export PIG_HOME = /home/Hadoop/Pigexport PATH  = PATH:/home/Hadoop/pig/binexport PIG_CLASSPATH = $HADOOP_HOME/conf

pig.properties文件

在Pig的 conf 文件夹中,我们有一个名为 pig.properties 的文件。在pig.properties文件中,可以设置如下所示的各种参数。

pig -h properties 

支持以下属性:

Logging: verbose = true|false; default is false. This property is the same as -v       switch brief=true|false; default is false. This property is the same        as -b switch debug=OFF|ERROR|WARN|INFO|DEBUG; default is INFO.                    This property is the same as -d switch aggregate.warning = true|false; default is true.        If true, prints count of warnings of each type rather than logging each warning.		 		 Performance tuning: pig.cachedbag.memusage=<mem fraction>; default is 0.2 (20% of all memory).       Note that this memory is shared across all large bags used by the application.                pig.skewedjoin.reduce.memusagea=<mem fraction>; default is 0.3 (30% of all memory).       Specifies the fraction of heap available for the reducer to perform the join.       pig.exec.nocombiner = true|false; default is false.           Only disable combiner as a temporary workaround for problems.                opt.multiquery = true|false; multiquery is on by default.           Only disable multiquery as a temporary workaround for problems.       opt.fetch=true|false; fetch is on by default.           Scripts containing Filter, Foreach, Limit, Stream, and Union can be dumped without MR jobs.                pig.tmpfilecompression = true|false; compression is off by default.                        Determines whether output of intermediate jobs is compressed.                pig.tmpfilecompression.codec = lzo|gzip; default is gzip.           Used in conjunction with pig.tmpfilecompression. Defines compression type.                pig.noSplitCombination = true|false. Split combination is on by default.           Determines if multiple small files are combined into a single map.         			         pig.exec.mapPartAgg = true|false. Default is false.                        Determines if partial aggregation is done within map phase, before records are sent to combiner.                pig.exec.mapPartAgg.minReduction=<min aggregation factor>. Default is 10.                        If the in-map partial aggregation does not reduce the output num records by this factor, it gets disabled.			  Miscellaneous: exectype = mapreduce|tez|local; default is mapreduce. This property is the same as -x switch       pig.additional.jars.uris=<comma seperated list of jars>. Used in place of register command.       udf.import.list=<comma seperated list of imports>. Used to avoid package names in UDF.       stop.on.failure = true|false; default is false. Set to true to terminate on the first error.                pig.datetime.default.tz=<UTC time offset>. e.g. +08:00. Default is the default timezone of the host.           Determines the timezone used to handle datetime datatype and UDFs.Additionally, any Hadoop property can be specified.

验证安装

通过键入version命令验证Apache Pig的安装。如果安装成功,你将获得Apache Pig的正式版本,如下所示。

$ pig –version  Apache Pig version 0.16.0 (r1682971)  compiled Jun 01 2015, 11:44:35


在上一章中,我们解释了如何安装Apache Pig。在本章中,我们将讨论如何执行Apache Pig。

Apache Pig执行模式

你可以以两种模式运行Apache Pig,即Local(本地)模式HDFS模式

Local模式

在此模式下,所有文件都从本地主机和本地文件系统安装和运行,不需要Hadoop或HDFS。此模式通常用于测试目的。

MapReduce模式

MapReduce模式是我们使用Apache Pig加载或处理Hadoop文件系统(HDFS)中存在的数据的地方。在这种模式下,每当我们执行Pig Latin语句来处理数据时,会在后端调用一个MapReduce作业,以对HDFS中存在的数据执行特定的操作。

Apache Pig执行机制

Apache Pig脚本可以通过三种方式执行,即交互模式,批处理模式和嵌入式模式。

  • 交互模式(Grunt shell) - 你可以使用Grunt shell以交互模式运行Apache Pig。在此shell中,你可以输入Pig Latin语句并获取输出(使用Dump运算符)。

  • 批处理模式(脚本) - 你可以通过将Pig Latin脚本写入具有 .pig 扩展名的单个文件中,以批处理模式运行Apache Pig。

  • 嵌入式模式(UDF) - Apache Pig允许在Java等编程语言中定义我们自己的函数(UDF用户定义函数),并在我们的脚本中使用它们。

调用Grunt Shell

你可以使用“-x"选项以所需的模式(local/MapReduce)调用Grunt shell,如下所示。

Local模式MapReduce模式

Command(命令) -

$ ./pig -x local

Command(命令)-

$ ./pig -x mapreduce

Output(输出) -

Local Mode Output

Output(输出)-

MapReduce Mode Output

这两个命令都给出了Grunt shell提示符,如下所示。

grunt>

你可以使用“ctrl+d"退出Grunt shell。

在调用Grunt shell之后,可以通过直接输入Pig中的Pig Latin语句来执行Pig脚本。

grunt> customers = LOAD 'customers.txt' USING PigStorage(',');

在批处理模式下执行Apache Pig

你可以在文件中编写整个Pig Latin脚本,并使用 -x command 执行它。我们假设在一个名为 sample_script.pig 的文件中有一个Pig脚本,如下所示。

Sample_script.pig

student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING   PigStorage(',') as (id:int,name:chararray,city:chararray);  Dump student;

现在,你可以在上面的文件中执行脚本,如下所示。

Local模式MapReduce模式
$ pig -x local Sample_script.pig $ pig -x mapreduce Sample_script.pig

注意:我们将详细讨论如何在批处理模式嵌入模式中运行Pig脚本。

调用Grunt shell后,可以在shell中运行Pig脚本。除此之外,还有由Grunt shell提供的一些有用的shell和实用程序命令。本章讲解的是Grunt shell提供的shell和实用程序命令。

注意:在本章的某些部分中,使用了LoadStore等命令。请参阅相应章节以获取有关它们的详细信息。

Shell 命令

Apache Pig的Grunt shell主要用于编写Pig Latin脚本。在此之前,我们可以使用 sh fs 来调用任何shell命令。

sh 命令

使用 sh 命令,我们可以从Grunt shell调用任何shell命令,但无法执行作为shell环境( ex - cd)一部分的命令。

语法

下面给出了 sh 命令的语法。

grunt> sh shell command parameters

示例

我们可以使用 sh 选项从Grunt shell中调用Linux shell的 ls 命令,如下所示。在此示例中,它列出了 /pig/bin/ 目录中的文件。

grunt> sh ls   pig pig_1444799121955.log pig.cmd pig.py

fs命令

使用 fs 命令,我们可以从Grunt shell调用任何FsShell命令。

语法

下面给出了 fs 命令的语法。

grunt> sh File System command parameters

示例

我们可以使用fs命令从Grunt shell调用HDFS的ls命令。在以下示例中,它列出了HDFS根目录中的文件。

grunt> fs –ls  Found 3 itemsdrwxrwxrwx   - Hadoop supergroup          0 2015-09-08 14:13 Hbasedrwxr-xr-x   - Hadoop supergroup          0 2015-09-09 14:52 seqgen_datadrwxr-xr-x   - Hadoop supergroup          0 2015-09-08 11:30 twitter_data

以同样的方式,我们可以使用 fs 命令从Grunt shell中调用所有其他文件系统的shell命令。

实用程序命令

Grunt shell提供了一组实用程序命令。这些包括诸如clear,help,history,quitset等实用程序命令;以及Grunt shell中诸如 exec,killrun等命令来控制Pig。下面给出了Grunt shell提供的实用命令的描述。

clear命令

clear 命令用于清除Grunt shell的屏幕。

语法

你可以使用 clear 命令清除grunt shell的屏幕,如下所示。

grunt> clear

help命令

help 命令提供了Pig命令或Pig属性的列表。

使用

你可以使用 help 命令获取Pig命令列表,如下所示。

grunt> helpCommands: <pig latin statement>; - See the PigLatin manual for details:http://hadoop.apache.org/pig  File system commands:fs <fs arguments> - Equivalent to Hadoop dfs  command:http://hadoop.apache.org/common/docs/current/hdfs_shell.html	 Diagnostic Commands:describe <alias>[::<alias] - Show the schema for the alias.Inner aliases can be described as A::B.    explain [-script <pigscript>] [-out <path>] [-brief] [-dot|-xml]        [-param <param_name>=<pCram_value>]       [-param_file <file_name>] [<alias>] -        Show the execution plan to compute the alias or for entire script.       -script - Explain the entire script.       -out - Store the output into directory rather than print to stdout.       -brief - Don't expand nested plans (presenting a smaller graph for overview).       -dot - Generate the output in .dot format. Default is text format.       -xml - Generate the output in .xml format. Default is text format.       -param <param_name - See parameter substitution for details.       -param_file <file_name> - See parameter substitution for details.       alias - Alias to explain.       dump <alias> - Compute the alias and writes the results to stdout.Utility Commands: exec [-param <param_name>=param_value] [-param_file <file_name>] <script> -       Execute the script with access to grunt environment including aliases.       -param <param_name - See parameter substitution for details.       -param_file <file_name> - See parameter substitution for details.       script - Script to be executed.    run [-param <param_name>=param_value] [-param_file <file_name>] <script> -       Execute the script with access to grunt environment.		 -param <param_name - See parameter substitution for details.                -param_file <file_name> - See parameter substitution for details.       script - Script to be executed.    sh  <shell command> - Invoke a shell command.    kill <job_id> - Kill the hadoop job specified by the hadoop job id.    set <key> <value> - Provide execution parameters to Pig. Keys and values are case sensitive.       The following keys are supported:       default_parallel - Script-level reduce parallelism. Basic input size heuristics used        by default.       debug - Set debug on or off. Default is off.       job.name - Single-quoted name for jobs. Default is PigLatin:<script name>            job.priority - Priority for jobs. Values: very_low, low, normal, high, very_high.       Default is normal stream.skippath - String that contains the path.       This is used by streaming any hadoop property.    help - Display this message.    history [-n] - Display the list statements in cache.       -n Hide line numbers.    quit - Quit the grunt shell. 

history命令

此命令显示自Grunt shell被调用以来执行/使用的语句的列表。

使用

假设我们自打开Grunt shell之后执行了三个语句。

grunt> customers = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(','); grunt> orders = LOAD 'hdfs://localhost:9000/pig_data/orders.txt' USING PigStorage(','); grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING PigStorage(','); 

然后,使用 history 命令将产生以下输出。

grunt> historycustomers = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',');   orders = LOAD 'hdfs://localhost:9000/pig_data/orders.txt' USING PigStorage(',');   student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING PigStorage(','); 

set命令

set 命令用于向Pig中使用的key显示/分配值。

使用

使用此命令,可以将值设置到以下key。

Key说明和值
default_parallel通过将任何整数作为值传递给此key来设置映射作业的reducer数。
debug关闭或打开Pig中的调试功能通过传递on/off到这个key。
job.name通过将字符串值传递给此key来将作业名称设置为所需的作业。
job.priority

通过将以下值之一传递给此key来设置作业的优先级:

  • very_low
  • low
  • normal
  • high
  • very_high
stream.skippath对于流式传输,可以通过将所需的路径以字符串形式传递到此key,来设置不传输数据的路径。

quit命令

你可以使用此命令从Grunt shell退出。

使用

从Grunt shell中退出,如下所示。

grunt> quit

现在让我们看看从Grunt shell控制Apache Pig的命令。

exec命令

使用 exec 命令,我们可以从Grunt shell执行Pig脚本。

语法

下面给出了实用程序命令 exec 的语法。

grunt> exec [–param param_name = param_value] [–param_file file_name] [script]

示例

我们假设在HDFS的 /pig_data/ 目录中有一个名为 student.txt 的文件,其中包含以下内容。

Student.txt

001,Rajiv,Hyderabad002,siddarth,Kolkata003,Rajesh,Delhi

并且,假设我们在HDFS的 /pig_data/ 目录中有一个名为 sample_script.pig 的脚本文件,并具有以下内容。

Sample_script.pig

student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING PigStorage(',')    as (id:int,name:chararray,city:chararray);  Dump student;

现在,让我们使用 exec 命令从Grunt shell中执行上面的脚本,如下所示。

grunt> exec /sample_script.pig

输出

exec 命令执行 sample_script.pig 中的脚本。按照脚本中的指示,它会将 student.txt 文件加载到Pig中,并显示Dump操作符的结果,显示以下内容。

(1,Rajiv,Hyderabad)(2,siddarth,Kolkata)(3,Rajesh,Delhi) 

kill命令

你可以使用此命令从Grunt shell中终止一个作业。

语法

下面给出了 kill 命令的语法。

grunt> kill JobId

示例

假设有一个具有id Id_0055 的正在运行的Pig作业,使用 kill 命令从Grunt shell中终止它,如下所示。

grunt> kill Id_0055

run命令

你可以使用run命令从Grunt shell运行Pig脚本

语法

下面给出了 run 命令的语法。

grunt> run [–param param_name = param_value] [–param_file file_name] script

示例

假设在HDFS的 /pig_data/ 目录中有一个名为 student.txt 的文件,其中包含以下内容。

Student.txt

001,Rajiv,Hyderabad002,siddarth,Kolkata003,Rajesh,Delhi

并且,假设我们在本地文件系统中有一个名为 sample_script.pig 的脚本文件,并具有以下内容。

Sample_script.pig

student = LOAD 'hdfs://localhost:9000/pig_data/student.txt' USING   PigStorage(',') as (id:int,name:chararray,city:chararray);

现在,让我们使用run命令从Grunt shell运行上面的脚本,如下所示。

grunt> run /sample_script.pig

你可以使用Dump操作符查看脚本的输出,如下所示。

grunt> Dump;(1,Rajiv,Hyderabad)(2,siddarth,Kolkata)(3,Rajesh,Delhi)

注意: exec run 命令之间的区别是,如果使用run,则脚本中的语句在history命令中可用。

Pig Latin是用于使用Apache Pig分析Hadoop中数据的语言。在本章中,我们将讨论Pig Latin的基础知识,如Pig Latin语句,数据类型,通用运算符,关系运算符和Pig Latin UDF。

Pig Latin - 数据模型

如前面章节所讨论的,Pig的数据模型是完全嵌套的。Relation是Pig Latin数据模型的最外层结构。它是一个其中:

  • 包是元组的集合。
  • 元组是有序的字段集。
  • 字段是一段数据。

Pig Latin - 语句

在使用Pig Latin处理数据时,语句是基本结构。

  • 这些语句使用关系(relation),它们包括表达式(expression)模式(schema)

  • 每个语句以分号(;)结尾。

  • 我们将使用Pig Latin提供的运算符通过语句执行各种操作。

  • 除了LOAD和STORE,在执行所有其他操作时,Pig Latin语句采用关系作为输入,并产生另一个关系作为输出。

  • 只要在Grunt shell中输入 Load 语句,就会执行语义检查。要查看模式的内容,需要使用 Dump 运算符。只有在执行 dump 操作后,才会执行将数据加载到文件系统的MapReduce作业。

例子

下面给出一个Pig Latin语句,它将数据加载到Apache Pig中。

grunt> Student_data = LOAD 'student_data.txt' USING PigStorage(',')as    ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

Pig Latin - 数据类型

下面给出的表描述了Pig Latin数据类型。

序号数据类型说明&示例
1int

表示有符号的32位整数。

示例:8

2long

表示有符号的64位整数。

示例:5L

3float

表示有符号的32位浮点。

示例:5.5F

4double

表示64位浮点。

示例:10.5

5chararray

表示Unicode UTF-8格式的字符数组(字符串)。

示例:‘51coolma’

6Bytearray

表示字节数组(blob)。

7Boolean

表示布尔值。

示例:true / false。

8Datetime

表示日期时间。

示例:1970-01-01T00:00:00.000 + 00:00

9Biginteger

表示Java BigInteger。

示例:60708090709

10Bigdecimal

表示Java BigDecimal

示例:185.98376256272893883

复杂类型
11Tuple

元组是有序的字段集。

示例:(raja,30)

12Bag

包是元组的集合。

示例:{(raju,30),(Mohhammad,45)}

13Map

地图是一组键值对。

示例:['name'#'Raju','age'#30]

Null值

所有上述数据类型的值可以为NULL。Apache Pig以与SQL类似的方式处理空值。null可以是未知值或不存在值,它用作可选值的占位符。这些空值可以自然出现或者可以是操作的结果。

Pig Latin - 算术运算符

下表描述了Pig Latin的算术运算符。假设a = 10和b = 20。

运算符描述示例
+

 - 运算符的两侧的值相加

a+b将得出30

 - 从运算符左边的数中减去右边的数

a-b将得出-10
*

 - 运算符两侧的值相乘

a*b将得出200
/

- 用运算符左边的数除右边的数

b / a将得出2
%

系数 - 用运算符右边的数除左边的数并返回余数

b%a将得出0
:

Bincond - 评估布尔运算符。它有三个操作数,如下所示。

变量 x =(expression)? value1 (如果为true): value2(如果为false)。

b =(a == 1)? 20:30;

如果a = 1,则b的值为20。

如果a!= 1,则b的值为30。

CASE

WHEN

THEN

ELSE

END

Case - case运算符等效于嵌套的bincond运算符。

CASE f2 % 2

WHEN  0

THEN

'even'


WHEN  1

THEN

'odd'

END

Pig Latin - 比较运算符

下表描述了Pig Latin的比较运算符。

运算符描述示例
==

等于 - 检查两个数的值是否相等;如果是,则条件变为true。

(a = b)不为true。
!=

不等于 - 检查两个数的值是否相等。如果值不相等,则条件为true。

(a!= b)为true。
>

大于 - 检查左边数的值是否大于右边数的值。 如果是,则条件变为true。

(a> b)不为true
<

小于 - 检查左边数的值是否小于右边数的值。 如果是,则条件变为true。

(a<b)为true。
>=

大于或等于 - 检查左边数的值是否大于或等于右边数的值。如果是,则条件变为true。

(a>=b)不为true。
<=

小于或等于 - 检查左边数的值是否小于或等于右边数的值。如果是,则条件变为true。

(a<=b)为true。
matches

模式匹配 - 检查左侧的字符串是否与右侧的常量匹配。

f1 matches '.* tutorial.*'

Pig Latin - 类型结构运算符

下表描述了Pig Latin的类型结构运算符。

运算符描述示例
()

元组构造函数运算符 - 此运算符用于构建元组。

(Raju,30)
{}

包构造函数运算符 - 此运算符用于构造包。

{(Raju,30),(Mohammad,45)}
[]

映射构造函数运算符 - 此运算符用于构造一个映射。

[name#Raja,age#30]

Pig Latin - 关系运算符

下表描述了Pig Latin的关系运算符。

运算符描述
加载和存储
LOAD将数据从文件系统(local/ HDFS)加载到关系中。
STORE将数据从文件系统(local/ HDFS)存储到关系中。
过滤
FILTER从关系中删除不需要的行。
DISTINCT从关系中删除重复行。
FOREACH,GENERATE基于数据列生成数据转换。
STREAM使用外部程序转换关系。
分组和连接
JOIN连接两个或多个关系。
COGROUP将数据分组为两个或多个关系。
GROUP在单个关系中对数据进行分组。
CROSS创建两个或多个关系的向量积。
排序
ORDER基于一个或多个字段(升序或降序)按排序排列关系。
LIMIT从关系中获取有限数量的元组。
合并和拆分
UNION将两个或多个关系合并为单个关系。
SPLIT将单个关系拆分为两个或多个关系。
诊断运算符
DUMP在控制台上打印关系的内容。
DESCRIBE描述关系的模式。
EXPLAIN查看逻辑,物理或MapReduce执行计划以计算关系。
ILLUSTRATE查看一系列语句的分步执行。

一般来说,Apache Pig在Hadoop之上工作。它是一种分析工具,用于分析 Hadoop File System中存在的大型数据集。要使用Apache Pig分析数据,我们必须首先将数据加载到Apache Pig中。本章介绍如何从HDFS将数据加载到Apache Pig。

准备HDFS

在MapReduce模式下,Pig从HDFS读取(加载)数据并将结果存回HDFS。因此,让我们先从HDFS开始,在HDFS中创建以下示例数据。

学生ID名字姓氏电话号码城市
001RajivReddy9848022337Hyderabad
002siddarthBattacharya9848022338Kolkata
003RajeshKhanna9848022339Delhi
004PreethiAgarwal9848022330Pune
005TrupthiMohanthy9848022336Bhuwaneshwar
006ArchanaMishra9848022335Chennai

上述数据集包含六个学生的个人详细信息,如id,名字,姓氏,电话号码和城市。

步骤1:验证Hadoop

首先,使用Hadoop version命令验证安装,如下所示。

$ hadoop version

如果你的系统里有Hadoop,并且已设置PATH变量,那么你将获得以下输出 -

Hadoop 2.6.0 Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 Compiled by jenkins on 2014-11-13T21:10Z Compiled with protoc 2.5.0 From source with checksum 18e43357c8f927c0695f1e9522859d6a This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoopcommon-2.6.0.jar

步骤2:启动HDFS

浏览Hadoop的 sbin 目录,并启动 yarn 和Hadoop dfs(分布式文件系统),如下所示。

cd /$Hadoop_Home/sbin/ $ start-dfs.sh localhost: starting namenode, logging to /home/Hadoop/hadoop/logs/hadoopHadoop-namenode-localhost.localdomain.out localhost: starting datanode, logging to /home/Hadoop/hadoop/logs/hadoopHadoop-datanode-localhost.localdomain.out Starting secondary namenodes [0.0.0.0] starting secondarynamenode, logging to /home/Hadoop/hadoop/logs/hadoop-Hadoopsecondarynamenode-localhost.localdomain.out $ start-yarn.sh starting yarn daemons starting resourcemanager, logging to /home/Hadoop/hadoop/logs/yarn-Hadoopresourcemanager-localhost.localdomain.out localhost: starting nodemanager, logging to /home/Hadoop/hadoop/logs/yarnHadoop-nodemanager-localhost.localdomain.out

步骤3:在HDFS中创建目录

在Hadoop DFS中,可以使用 mkdir 命令创建目录。在HDFS所需路径中创建一个名为 Pig_Data 的新目录,如下所示。

$cd /$Hadoop_Home/bin/ $ hdfs dfs -mkdir hdfs://localhost:9000/Pig_Data 

步骤4:将数据放在HDFS中

Pig的输入文件包含单个行中的每个元组/记录。记录的实体由分隔符分隔(在我们的示例中,我们使用“,”)。在本地文件系统中,创建一个包含数据的输入文件 student_data.txt ,如下所示。

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

现在,使用 put 命令将文件从本地文件系统移动到HDFS,如下所示。(你也可以使用 copyFromLocal 命令。)

$ cd $HADOOP_HOME/bin $ hdfs dfs -put /home/Hadoop/Pig/Pig_Data/student_data.txt dfs://localhost:9000/pig_data/

验证文件

使用 cat 命令验证文件是否已移入HDFS,如下所示。

$ cd $HADOOP_HOME/bin$ hdfs dfs -cat hdfs://localhost:9000/pig_data/student_data.txt

输出

现在,可以看到文件的内容,如下所示。

15/10/01 12:16:55 WARN util.NativeCodeLoader: Unable to load native-hadooplibrary for your platform... using builtin-java classes where applicable  001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai

Load运算符

你可以使用 Pig Latin LOAD 运算符,从文件系统(HDFS / Local)将数据加载到Apache Pig中。

语法

load语句由两部分组成,用“=”运算符分隔。在左侧,需要提到我们想要存储数据的关系的名称;而在右侧,我们需要定义如何存储数据。下面给出了 Load 运算符的语法。

Relation_name = LOAD 'Input file path' USING function as schema;

说明:

  • relation_name - 我们必须提到要存储数据的关系。

  • Input file path - 我们必须提到存储文件的HDFS目录。(在MapReduce模式下)

  • function - 我们必须从Apache Pig提供的一组加载函数中选择一个函数 BinStorage,JsonLoader,PigStorage,TextLoader 

  • Schema - 我们必须定义数据的模式,可以定义所需的模式如下 -

(column1 : data type, column2 : data type, column3 : data type);

注意:我们加载数据而不指定模式。在这种情况下,列将被寻址为$01,$02,等...(检查)。

例如,我们使用 LOAD 命令,在名为学生的模式下在Pig中的 student_data.txt 加载数据。

启动Pig Grunt Shell

首先,打开Linux终端。在MapReduce模式下启动Pig Grunt shell,如下所示。

$ Pig –x mapreduce

它将启动Pig Grunt shell,如下所示。

15/10/01 12:33:37 INFO pig.ExecTypeProvider: Trying ExecType : LOCAL15/10/01 12:33:37 INFO pig.ExecTypeProvider: Trying ExecType : MAPREDUCE15/10/01 12:33:37 INFO pig.ExecTypeProvider: Picked MAPREDUCE as the ExecType2015-10-01 12:33:38,080 [main] INFO  org.apache.pig.Main - Apache Pig version 0.15.0 (r1682971) compiled Jun 01 2015, 11:44:352015-10-01 12:33:38,080 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/Hadoop/pig_1443683018078.log2015-10-01 12:33:38,242 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/Hadoop/.pigbootup not found  2015-10-01 12:33:39,630 [main]INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000 grunt>

执行Load语句

现在,通过在Grunt shell中执行以下Pig Latin语句,将文件 student_data.txt 中的数据加载到Pig中。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt'    USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray,    city:chararray );

以下是对上述说明的描述。

Relation name我们已将数据存储在学生(student)模式中。
Input file path我们从HDFS的/pig_data/目录中的 student_data.txt 文件读取数据。
Storage function我们使用了 PigStorage() 函数,将数据加载并存储为结构化文本文件。它采用分隔符,使用元组的每个实体作为参数分隔。默认情况下,它以“ "作为参数。
schema

我们已经使用以下模式存储了数据。

columnid名字姓氏电话号码城市
datatypeintchar arraychar arraychar arraychar array

注意: Load语句会简单地将数据加载到Pig的指定的关系中。要验证Load语句的执行情况,必须使用Diagnostic运算符,这将在后续的章节中讨论。

在上一章中,我们学习了如何将数据加载到Apache Pig中。你可以使用 store 运算符将加载的数据存储在文件系统中,本章介绍如何使用 Store 运算符在Apache Pig中存储数据。

语法

下面给出了Store语句的语法。

STORE Relation_name INTO ' required_directory_path ' [USING function];

假设我们在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt'    USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray,    city:chararray );

现在,让我们将关系存储在HDFS目录“/pig_Output/"中,如下所示。

grunt> STORE student INTO ' hdfs://localhost:9000/pig_Output/ ' USING PigStorage (',');

输出

执行 store 语句后,将获得以下输出。使用指定的名称创建目录,并将数据存储在其中。

2015-10-05 13:05:05,429 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLau ncher - 100% complete2015-10-05 13:05:05,429 [main] INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:   HadoopVersion    PigVersion    UserId    StartedAt             FinishedAt             Features 2.6.0            0.15.0        Hadoop    2015-10-0 13:03:03    2015-10-05 13:05:05    UNKNOWN  Success!  Job Stats (time in seconds): JobId          Maps    Reduces    MaxMapTime    MinMapTime    AvgMapTime    MedianMapTime    job_14459_06    1        0           n/a           n/a           n/a           n/aMaxReduceTime    MinReduceTime    AvgReduceTime    MedianReducetime    Alias    Feature        0                 0                0                0             student  MAP_ONLY OutPut folderhdfs://localhost:9000/pig_Output/  Input(s): Successfully read 0 records from: "hdfs://localhost:9000/pig_data/student_data.txt"  Output(s): Successfully stored 0 records in: "hdfs://localhost:9000/pig_Output"  Counters:Total records written : 0Total bytes written : 0Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0Total records proactively spilled: 0  Job DAG: job_1443519499159_0006  2015-10-05 13:06:06,192 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLau ncher - Success!

验证

你可以如下所示验证存储的数据。

步骤1

首先,使用 ls 命令列出名为 pig_output 的目录中的文件,如下所示。

hdfs dfs -ls 'hdfs://localhost:9000/pig_Output/'Found 2 itemsrw-r--r-   1 Hadoop supergroup          0 2015-10-05 13:03 hdfs://localhost:9000/pig_Output/_SUCCESSrw-r--r-   1 Hadoop supergroup        224 2015-10-05 13:03 hdfs://localhost:9000/pig_Output/part-m-00000

可以观察到在执行 store 语句后创建了两个文件。

步骤2

使用 cat 命令,列出名为 part-m-00000 的文件的内容,如下所示。

$ hdfs dfs -cat 'hdfs://localhost:9000/pig_Output/part-m-00000' 1,Rajiv,Reddy,9848022337,Hyderabad2,siddarth,Battacharya,9848022338,Kolkata3,Rajesh,Khanna,9848022339,Delhi4,Preethi,Agarwal,9848022330,Pune5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar6,Archana,Mishra,9848022335,Chennai 


Load 语句会简单地将数据加载到Apache Pig中的指定关系中。要验证Load语句的执行,必须使用Diagnostic运算符Pig Latin提供四种不同类型的诊断运算符:

  • Dump运算符
  • Describe运算符
  • Explanation运算符
  • Illustration运算符

在本章中,我们将讨论Pig Latin的Dump运算符。

Dump运算符

Dump 运算符用于运行Pig Latin语句,并在屏幕上显示结果,它通常用于调试目的。

语法

下面给出了 Dump 运算符的语法。

grunt> Dump Relation_Name

假设在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

我们使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt'    USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray,    city:chararray );

现在,使用Dump运算符打印关系的内容,如下所示。

grunt> Dump student

一旦执行上述 Pig Latin 语句,将启动一个MapReduce作业以从HDFS读取数据,将产生以下输出。

2015-10-01 15:05:27,642 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete2015-10-01 15:05:27,652 [main]INFO  org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:   HadoopVersion  PigVersion  UserId    StartedAt             FinishedAt       Features             2.6.0          0.15.0      Hadoop  2015-10-01 15:03:11  2015-10-01 05:27     UNKNOWN                                                Success!  Job Stats (time in seconds):  JobId           job_14459_0004Maps                 1  Reduces              0  MaxMapTime          n/a    MinMapTime          n/aAvgMapTime          n/a MedianMapTime       n/aMaxReduceTime        0MinReduceTime        0  AvgReduceTime        0MedianReducetime     0Alias             student Feature           MAP_ONLY        Outputs           hdfs://localhost:9000/tmp/temp580182027/tmp757878456,Input(s): Successfully read 0 records from: "hdfs://localhost:9000/pig_data/student_data.txt"  Output(s): Successfully stored 0 records in: "hdfs://localhost:9000/tmp/temp580182027/tmp757878456"  Counters: Total records written : 0 Total bytes written : 0 Spillable Memory Manager spill count : 0Total bags proactively spilled: 0 Total records proactively spilled: 0  Job DAG: job_1443519499159_0004  2015-10-01 15:06:28,403 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLau ncher - Success!2015-10-01 15:06:28,441 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.2015-10-01 15:06:28,485 [main]INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 12015-10-01 15:06:28,485 [main]INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input pathsto process : 1(1,Rajiv,Reddy,9848022337,Hyderabad)(2,siddarth,Battacharya,9848022338,Kolkata)(3,Rajesh,Khanna,9848022339,Delhi)(4,Preethi,Agarwal,9848022330,Pune)(5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)(6,Archana,Mishra,9848022335,Chennai)


describe 运算符用于查看关系的模式。

语法

describe 运算符的语法如下

grunt> Describe Relation_name

假设在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt' USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们描述名为student的关系,并验证模式如下所示。

grunt> describe student;

输出

执行上述 Pig Latin 语句后,将生成以下输出。

grunt> student: { id: int,firstname: chararray,lastname: chararray,phone: chararray,city: chararray }


explain 运算符用于显示关系的逻辑,物理和MapReduce执行计划。

语法

下面给出了 explain 运算符的语法。

grunt> explain Relation_name;

假设在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt' USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们使用 explain 运算符解释名为student的关系,如下所示。

grunt> explain student;

输出

它将产生以下输出。

$ explain student;2015-10-05 11:32:43,660 [main]2015-10-05 11:32:43,660 [main] INFO  org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer -{RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, ConstantCalculator,GroupByConstParallelSetter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, PartitionFilterOptimizer, PredicatePushdownOptimizer,PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter]}  #-----------------------------------------------# New Logical Plan: #-----------------------------------------------student: (Name: LOStore Schema:id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#35:chararray)| |---student: (Name: LOForEach Schema:id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#35:chararray)    |   |    |   (Name: LOGenerate[false,false,false,false,false] Schema:id#31:int,firstname#32:chararray,lastname#33:chararray,phone#34:chararray,city#35:chararray)ColumnPrune:InputUids=[34, 35, 32, 33,31]ColumnPrune:OutputUids=[34, 35, 32, 33, 31]    |   |   |     |   |   (Name: Cast Type: int Uid: 31)     |   |   |     |   |   |---id:(Name: Project Type: bytearray Uid: 31 Input: 0 Column: (*))    |   |   |         |   |   (Name: Cast Type: chararray Uid: 32)    |   |   |     |   |   |---firstname:(Name: Project Type: bytearray Uid: 32 Input: 1Column: (*))    |   |   |    |   |   (Name: Cast Type: chararray Uid: 33)    |   |   |    |   |   |---lastname:(Name: Project Type: bytearray Uid: 33 Input: 2	 Column: (*))    |   |   |     |   |   (Name: Cast Type: chararray Uid: 34)    |   |   |      |   |   |---phone:(Name: Project Type: bytearray Uid: 34 Input: 3 Column:(*))    |   |   |     |   |   (Name: Cast Type: chararray Uid: 35)    |   |   |      |   |   |---city:(Name: Project Type: bytearray Uid: 35 Input: 4 Column:(*))    |   |     |   |---(Name: LOInnerLoad[0] Schema: id#31:bytearray)    |   |      |   |---(Name: LOInnerLoad[1] Schema: firstname#32:bytearray)    |   |    |   |---(Name: LOInnerLoad[2] Schema: lastname#33:bytearray)    |   |    |   |---(Name: LOInnerLoad[3] Schema: phone#34:bytearray)    |   |     |   |---(Name: LOInnerLoad[4] Schema: city#35:bytearray)    |    |---student: (Name: LOLoad Schema: id#31:bytearray,firstname#32:bytearray,lastname#33:bytearray,phone#34:bytearray,city#35:bytearray)RequiredFields:null #-----------------------------------------------# Physical Plan: #-----------------------------------------------student: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36| |---student: New For Each(false,false,false,false,false)[bag] - scope-35    |   |    |   Cast[int] - scope-21    |   |    |   |---Project[bytearray][0] - scope-20    |   |      |   Cast[chararray] - scope-24    |   |    |   |---Project[bytearray][1] - scope-23    |   |     |   Cast[chararray] - scope-27    |   |      |   |---Project[bytearray][2] - scope-26     |   |      |   Cast[chararray] - scope-30     |   |      |   |---Project[bytearray][3] - scope-29    |   |    |   Cast[chararray] - scope-33    |   |     |   |---Project[bytearray][4] - scope-32    |     |---student: Load(hdfs://localhost:9000/pig_data/student_data.txt:PigStorage(',')) - scope192015-10-05 11:32:43,682 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false2015-10-05 11:32:43,684 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOp timizer - MR plan size before optimization: 1 2015-10-05 11:32:43,685 [main]INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOp timizer - MR plan size after optimization: 1 #--------------------------------------------------# Map Reduce Plan                                   #--------------------------------------------------MapReduce node scope-37Map Planstudent: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-36||---student: New For Each(false,false,false,false,false)[bag] - scope-35    |   |    |   Cast[int] - scope-21     |   |    |   |---Project[bytearray][0] - scope-20    |   |    |   Cast[chararray] - scope-24    |   |    |   |---Project[bytearray][1] - scope-23    |   |    |   Cast[chararray] - scope-27    |   |     |   |---Project[bytearray][2] - scope-26     |   |     |   Cast[chararray] - scope-30     |   |      |   |---Project[bytearray][3] - scope-29     |   |     |   Cast[chararray] - scope-33    |   |     |   |---Project[bytearray][4] - scope-32     |      |---student:Load(hdfs://localhost:9000/pig_data/student_data.txt:PigStorage(',')) - scope19-------- Global sort: false ---------------- 


illustrate 运算符为你提供了一系列语句的逐步执行。

语法

下面给出了illustrate运算符的语法。

grunt> illustrate Relation_name;

假设在HDFS中有一个包含以下内容的文件 student_data.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata 003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune 005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

使用LOAD运算符将它读入关系 student ,如下所示。

grunt> student = LOAD 'hdfs://localhost:9000/pig_data/student_data.txt' USING PigStorage(',')   as ( id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray );

现在,让我们说明如下所示的名为student的关系。

grunt> illustrate student;

输出

在执行上面的语句时,将获得以下输出。

grunt> illustrate student;INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapOnly$M ap - Aliasesbeing processed per job phase (AliasName[line,offset]): M: student[1,10] C:  R:---------------------------------------------------------------------------------------------|student | id:int | firstname:chararray | lastname:chararray | phone:chararray | city:chararray |--------------------------------------------------------------------------------------------- |        | 002    | siddarth            | Battacharya        | 9848022338      | Kolkata        |---------------------------------------------------------------------------------------------


GROUP 运算符用于在一个或多个关系中对数据进行分组,它收集具有相同key的数据。

语法

下面给出了 group 运算符的语法。

grunt> Group_data = GROUP Relation_name BY age;

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi004,Preethi,Agarwal,21,9848022330,Pune005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar006,Archana,Mishra,23,9848022335,Chennai007,Komal,Nayak,24,9848022334,trivendram008,Bharathi,Nambiayar,24,9848022333,Chennai

将这个文件加载到Apache Pig中,关系名称为student_details,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);

现在,让我们按照年龄关系中的记录/元组进行分组,如下所示。

grunt> group_data = GROUP student_details by age;

验证

使用 DUMP 运算符验证关系 group_data ,如下所示。

grunt> Dump group_data;

输出

将获得显示名为group_data关系的内容的输出,如下所示。在这里你可以观察到结果模式有两列:

  • 一个是age,通过它我们将关系分组。

  • 另一个是bag,其中包含一组元组,有各自年龄的学生记录。

(21,{(4,Preethi,Agarwal,21,9848022330,Pune),(1,Rajiv,Reddy,21,9848022337,Hydera bad)})(22,{(3,Rajesh,Khanna,22,9848022339,Delhi),(2,siddarth,Battacharya,22,984802233 8,Kolkata)})(23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336 ,Bhuwaneshwar)})(24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334, trivendram)})

在使用 describe 命令分组数据后,可以看到表的模式,如下所示。

grunt> Describe group_data;  group_data: {group: int,student_details: {(id: int,firstname: chararray,               lastname: chararray,age: int,phone: chararray,city: chararray)}}

以同样的方式,可以使用illustrate命令获取模式的示例说明,如下所示。

$ Illustrate group_data;

它将产生以下输出

------------------------------------------------------------------------------------------------- |group_data|  group:int | student_details:bag{:tuple(id:int,firstname:chararray,lastname:chararray,age:int,phone:chararray,city:chararray)}|------------------------------------------------------------------------------------------------- |          |     21     | { 4, Preethi, Agarwal, 21, 9848022330, Pune), (1, Rajiv, Reddy, 21, 9848022337, Hyderabad)}| |          |     2      | {(2,siddarth,Battacharya,22,9848022338,Kolkata),(003,Rajesh,Khanna,22,9848022339,Delhi)}| -------------------------------------------------------------------------------------------------

按多列分组

让我们按年龄和城市对关系进行分组,如下所示。

grunt> group_multiple = GROUP student_details by (age, city);

可以使用Dump运算符验证名为 group_multiple 的关系的内容,如下所示。

grunt> Dump group_multiple;   ((21,Pune),{(4,Preethi,Agarwal,21,9848022330,Pune)})((21,Hyderabad),{(1,Rajiv,Reddy,21,9848022337,Hyderabad)})((22,Delhi),{(3,Rajesh,Khanna,22,9848022339,Delhi)})((22,Kolkata),{(2,siddarth,Battacharya,22,9848022338,Kolkata)})((23,Chennai),{(6,Archana,Mishra,23,9848022335,Chennai)})((23,Bhuwaneshwar),{(5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)})((24,Chennai),{(8,Bharathi,Nambiayar,24,9848022333,Chennai)})(24,trivendram),{(7,Komal,Nayak,24,9848022334,trivendram)})

Group All

你可以按所有的列对关系进行分组,如下所示。

grunt> group_all = GROUP student_details All;

现在,请验证关系 group_all 的内容,如下所示。

grunt> Dump group_all;    (all,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334 ,trivendram), (6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336,Bhuw aneshwar), (4,Preethi,Agarwal,21,9848022330,Pune),(3,Rajesh,Khanna,22,9848022339,Delhi), (2,siddarth,Battacharya,22,9848022338,Kolkata),(1,Rajiv,Reddy,21,9848022337,Hyd erabad)})


COGROUP 运算符的运作方式与 GROUP 运算符相同。两个运算符之间的唯一区别是 group 运算符通常用于一个关系,而 cogroup 运算符用于涉及两个或多个关系的语句。

使用Cogroup分组两个关系

假设在HDFS目录 /pig_data/ 中有两个文件,即 student_details.txt employee_details.txt ,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi004,Preethi,Agarwal,21,9848022330,Pune005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar006,Archana,Mishra,23,9848022335,Chennai007,Komal,Nayak,24,9848022334,trivendram008,Bharathi,Nambiayar,24,9848022333,Chennai

employee_details.txt

001,Robin,22,newyork 002,BOB,23,Kolkata 003,Maya,23,Tokyo 004,Sara,25,London 005,David,23,Bhuwaneshwar 006,Maggy,22,Chennai

将这些文件分别加载到Pig中,关系名称分别为 student_details employee_details ,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);   grunt> employee_details = LOAD 'hdfs://localhost:9000/pig_data/employee_details.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, city:chararray);

现在,将 student_details employee_details 关系的记录/元组按关键字age进行分组,如下所示。

grunt> cogroup_data = COGROUP student_details by age, employee_details by age;

验证

使用 DUMP 运算符验证关系 cogroup_data ,如下所示。

grunt> Dump cogroup_data;

输出

它将产生以下输出,显示名为 cogroup_data 的关系的内容,如下所示。

(21,{(4,Preethi,Agarwal,21,9848022330,Pune), (1,Rajiv,Reddy,21,9848022337,Hyderabad)},    {    })  (22,{ (3,Rajesh,Khanna,22,9848022339,Delhi), (2,siddarth,Battacharya,22,9848022338,Kolkata) },     { (6,Maggy,22,Chennai),(1,Robin,22,newyork) })  (23,{(6,Archana,Mishra,23,9848022335,Chennai),(5,Trupthi,Mohanthy,23,9848022336 ,Bhuwaneshwar)},    {(5,David,23,Bhuwaneshwar),(3,Maya,23,Tokyo),(2,BOB,23,Kolkata)}) (24,{(8,Bharathi,Nambiayar,24,9848022333,Chennai),(7,Komal,Nayak,24,9848022334, trivendram)},    { })  (25,{   },    {(4,Sara,25,London)})

cogroup 运算符根据年龄对来自每个关系的元组进行分组,其中每个组描述特定的年龄值。

例如,如果我们考虑结果的第一个元组,它按照年龄21分组,那它包含两个包

  • 第一个包保存了具有21岁的第一关系(在这种情况下是 student_details )的所有元组;

  • 第二个包具有第二关系(在这种情况下为 employee_details )的所有元组,其年龄为21岁。

如果关系不具有年龄值为21的元组,则返回一个空包。

JOIN 运算符用于组合来自两个或多个关系的记录。在执行连接操作时,我们从每个关系中声明一个(或一组)元组作为key。 当这些key匹配时,两个特定的元组匹配,否则记录将被丢弃。连接可以是以下类型:

  • Self-join
  • Inner-join
  • Outer-join − left join, right join, and full join

本章介绍了如何在Pig Latin中使用join运算符的示例。假设在HDFS的 /pig_data/ 目录中有两个文件,即 customers.txt orders.txt ,如下所示。

customers.txt

1,Ramesh,32,Ahmedabad,2000.002,Khilan,25,Delhi,1500.003,kaushik,23,Kota,2000.004,Chaitali,25,Mumbai,6500.00 5,Hardik,27,Bhopal,8500.006,Komal,22,MP,4500.007,Muffy,24,Indore,10000.00

orders.txt

102,2009-10-08 00:00:00,3,3000100,2009-10-08 00:00:00,3,1500101,2009-11-20 00:00:00,2,1560103,2008-05-20 00:00:00,4,2060

我们将这两个文件 customers 和 orders 关系一起加载到Pig中,如下所示。

grunt> customers = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, address:chararray, salary:int);  grunt> orders = LOAD 'hdfs://localhost:9000/pig_data/orders.txt' USING PigStorage(',')   as (oid:int, date:chararray, customer_id:int, amount:int);

现在让我们对这两个关系执行各种连接操作。

Self-join(自连接)

Self-join 用于将表与其自身连接,就像表是两个关系一样,临时重命名至少一个关系。通常,在Apache Pig中,为了执行self-join,我们将在不同的别名(名称)下多次加载相同的数据。那么,将文件 customers.txt 的内容加载为两个表,如下所示。

grunt> customers1 = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, address:chararray, salary:int);  grunt> customers2 = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, address:chararray, salary:int); 

语法

下面给出使用 JOIN 运算符执行self-join操作的语法。

grunt> Relation3_name = JOIN Relation1_name BY key, Relation2_name BY key ;

通过如图所示加入两个关系 customers1 customers2 ,对关系 customers 执行self-join 操作

grunt> customers3 = JOIN customers1 BY id, customers2 BY id;

验证

使用 DUMP 运算符验证关系 customers3 ,如下所示。

grunt> Dump customers3;

输出

将产生以下输出,显示关系 customers 的内容。

(1,Ramesh,32,Ahmedabad,2000,1,Ramesh,32,Ahmedabad,2000)(2,Khilan,25,Delhi,1500,2,Khilan,25,Delhi,1500)(3,kaushik,23,Kota,2000,3,kaushik,23,Kota,2000)(4,Chaitali,25,Mumbai,6500,4,Chaitali,25,Mumbai,6500)(5,Hardik,27,Bhopal,8500,5,Hardik,27,Bhopal,8500)(6,Komal,22,MP,4500,6,Komal,22,MP,4500)(7,Muffy,24,Indore,10000,7,Muffy,24,Indore,10000)

Inner Join(内部连接)

Inner Join使用较为频繁;它也被称为等值连接当两个表中都存在匹配时,内部连接将返回行。基于连接谓词(join-predicate),通过组合两个关系(例如A和B)的列值来创建新关系。查询将A的每一行与B的每一行进行比较,以查找满足连接谓词的所有行对。当连接谓词被满足时,A和B的每个匹配的行对的列值被组合成结果行。

语法

以下是使用 JOIN 运算符执行inner join操作的语法。

grunt> result = JOIN relation1 BY columnname, relation2 BY columnname;

让我们对customersorders执行inner join操作,如下所示。

grunt> coustomer_orders = JOIN customers BY id, orders BY customer_id;

验证

使用 DUMP 运算符验证 coustomer_orders 关系,如下所示。

grunt> Dump coustomer_orders;

输出

将获得以下输出,是名为 coustomer_orders 的关系的内容。

(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)

注意

Outer Join:与inner join不同,outer join返回至少一个关系中的所有行。outer join操作以三种方式执行:

  • Left outer join
  • Right outer join
  • Full outer join

Left Outer Join(左外连接)

left outer join操作返回左表中的所有行,即使右边的关系中没有匹配项

语法

下面给出使用 JOIN 运算符执行left outer join操作的语法。

grunt> Relation3_name = JOIN Relation1_name BY id LEFT OUTER, Relation2_name BY customer_id;

让我们对customers和orders的两个关系执行left outer join操作,如下所示。

grunt> outer_left = JOIN customers BY id LEFT OUTER, orders BY customer_id;

验证

使用 DUMP 运算符验证关系 outer_left ,如下所示。

grunt> Dump outer_left;

输出

它将产生以下输出,显示关系 outer_left 的内容。

(1,Ramesh,32,Ahmedabad,2000,,,,)(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)(5,Hardik,27,Bhopal,8500,,,,)(6,Komal,22,MP,4500,,,,)(7,Muffy,24,Indore,10000,,,,) 

Right Outer Join(右外连接)

right outer join操作将返回右表中的所有行,即使左表中没有匹配项。

语法

下面给出使用 JOIN 运算符执行right outer join操作的语法。

grunt> outer_right = JOIN customers BY id RIGHT, orders BY customer_id;

让我们对customersorders执行right outer join操作,如下所示。

grunt> outer_right = JOIN customers BY id RIGHT, orders BY customer_id;

验证

使用 DUMP 运算符验证关系 outer_right ,如下所示。

grunt> Dump outer_right

输出

它将产生以下输出,显示关系 outer_right 的内容。

(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)

Full Outer Join(全外连接)

当一个关系中存在匹配时,full outer join操作将返回行。

语法

下面给出使用 JOIN 运算符执行full outer join的语法。

grunt> outer_full = JOIN customers BY id FULL OUTER, orders BY customer_id;

让我们对customersorders执行full outer join操作,如下所示。

grunt> outer_full = JOIN customers BY id FULL OUTER, orders BY customer_id;

验证

使用 DUMP 运算符验证关系 outer_full ,如下所示。

grun> Dump outer_full; 

输出

它将产生以下输出,显示关系 outer_full 的内容。

(1,Ramesh,32,Ahmedabad,2000,,,,)(2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560)(3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500)(3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000)(4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060)(5,Hardik,27,Bhopal,8500,,,,)(6,Komal,22,MP,4500,,,,)(7,Muffy,24,Indore,10000,,,,)

使用多个Key

我们可以使用多个key执行JOIN操作。

语法

下面是如何使用多个key对两个表执行JOIN操作。

grunt> Relation3_name = JOIN Relation2_name BY (key1, key2), Relation3_name BY (key1, key2);

假设在HDFS的 /pig_data/ 目录中有两个文件,即 employee.txt employee_contact.txt ,如下所示。

employee.txt

001,Rajiv,Reddy,21,programmer,003002,siddarth,Battacharya,22,programmer,003003,Rajesh,Khanna,22,programmer,003004,Preethi,Agarwal,21,programmer,003005,Trupthi,Mohanthy,23,programmer,003006,Archana,Mishra,23,programmer,003007,Komal,Nayak,24,teamlead,002008,Bharathi,Nambiayar,24,manager,001

employee_contact.txt

001,9848022337,Rajiv@gmail.com,Hyderabad,003002,9848022338,siddarth@gmail.com,Kolkata,003003,9848022339,Rajesh@gmail.com,Delhi,003004,9848022330,Preethi@gmail.com,Pune,003005,9848022336,Trupthi@gmail.com,Bhuwaneshwar,003006,9848022335,Archana@gmail.com,Chennai,003007,9848022334,Komal@gmail.com,trivendram,002008,9848022333,Bharathi@gmail.com,Chennai,001

将这两个文件加载到Pig中,通过关系 employee employee_contact ,如下所示。

grunt> employee = LOAD 'hdfs://localhost:9000/pig_data/employee.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, designation:chararray, jobid:int);  grunt> employee_contact = LOAD 'hdfs://localhost:9000/pig_data/employee_contact.txt' USING PigStorage(',')    as (id:int, phone:chararray, email:chararray, city:chararray, jobid:int);

现在,让我们使用 JOIN 运算符连接这两个关系的内容,如下所示。

grunt> emp = JOIN employee BY (id,jobid), employee_contact BY (id,jobid);

验证

使用 DUMP 运算符验证关系 emp ,如下所示。

grunt> Dump emp; 

输出

它将产生以下输出,显示名为 emp 的关系的内容,如下所示。

(1,Rajiv,Reddy,21,programmer,113,1,9848022337,Rajiv@gmail.com,Hyderabad,113)(2,siddarth,Battacharya,22,programmer,113,2,9848022338,siddarth@gmail.com,Kolka ta,113)  (3,Rajesh,Khanna,22,programmer,113,3,9848022339,Rajesh@gmail.com,Delhi,113)  (4,Preethi,Agarwal,21,programmer,113,4,9848022330,Preethi@gmail.com,Pune,113)  (5,Trupthi,Mohanthy,23,programmer,113,5,9848022336,Trupthi@gmail.com,Bhuwaneshw ar,113)  (6,Archana,Mishra,23,programmer,113,6,9848022335,Archana@gmail.com,Chennai,113)  (7,Komal,Nayak,24,teamlead,112,7,9848022334,Komal@gmail.com,trivendram,112)  (8,Bharathi,Nambiayar,24,manager,111,8,9848022333,Bharathi@gmail.com,Chennai,111)


CROSS 运算符计算两个或多个关系的向量积。本章将以示例说明如何在Pig Latin中使用cross运算符。

语法

下面给出了 CROSS 运算符的语法。

grunt> Relation3_name = CROSS Relation1_name, Relation2_name;

假设在HDFS的 /pig_data/ 目录中有两个文件,即 customers.txt orders.txt ,如下所示。

customers.txt

1,Ramesh,32,Ahmedabad,2000.002,Khilan,25,Delhi,1500.003,kaushik,23,Kota,2000.004,Chaitali,25,Mumbai,6500.005,Hardik,27,Bhopal,8500.006,Komal,22,MP,4500.007,Muffy,24,Indore,10000.00

orders.txt

102,2009-10-08 00:00:00,3,3000100,2009-10-08 00:00:00,3,1500101,2009-11-20 00:00:00,2,1560103,2008-05-20 00:00:00,4,2060

将这两个文件加载到Pig中,通过关系 customers  orders,如下所示。

grunt> customers = LOAD 'hdfs://localhost:9000/pig_data/customers.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, address:chararray, salary:int);  grunt> orders = LOAD 'hdfs://localhost:9000/pig_data/orders.txt' USING PigStorage(',')   as (oid:int, date:chararray, customer_id:int, amount:int);

现在让我们使用 cross 运算符获得这两个关系的向量积,如下所示。

grunt> cross_data = CROSS customers, orders;

验证

使用 DUMP 运算符验证关系 cross_data ,如下所示。

grunt> Dump cross_data;

输出

它将产生以下输出,显示关系 cross_data 的内容。

(7,Muffy,24,Indore,10000,103,2008-05-20 00:00:00,4,2060) (7,Muffy,24,Indore,10000,101,2009-11-20 00:00:00,2,1560) (7,Muffy,24,Indore,10000,100,2009-10-08 00:00:00,3,1500) (7,Muffy,24,Indore,10000,102,2009-10-08 00:00:00,3,3000) (6,Komal,22,MP,4500,103,2008-05-20 00:00:00,4,2060) (6,Komal,22,MP,4500,101,2009-11-20 00:00:00,2,1560) (6,Komal,22,MP,4500,100,2009-10-08 00:00:00,3,1500) (6,Komal,22,MP,4500,102,2009-10-08 00:00:00,3,3000) (5,Hardik,27,Bhopal,8500,103,2008-05-20 00:00:00,4,2060) (5,Hardik,27,Bhopal,8500,101,2009-11-20 00:00:00,2,1560) (5,Hardik,27,Bhopal,8500,100,2009-10-08 00:00:00,3,1500) (5,Hardik,27,Bhopal,8500,102,2009-10-08 00:00:00,3,3000) (4,Chaitali,25,Mumbai,6500,103,2008-05-20 00:00:00,4,2060) (4,Chaitali,25,Mumbai,6500,101,2009-20 00:00:00,4,2060) (2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560) (2,Khilan,25,Delhi,1500,100,2009-10-08 00:00:00,3,1500) (2,Khilan,25,Delhi,1500,102,2009-10-08 00:00:00,3,3000) (1,Ramesh,32,Ahmedabad,2000,103,2008-05-20 00:00:00,4,2060) (1,Ramesh,32,Ahmedabad,2000,101,2009-11-20 00:00:00,2,1560) (1,Ramesh,32,Ahmedabad,2000,100,2009-10-08 00:00:00,3,1500) (1,Ramesh,32,Ahmedabad,2000,102,2009-10-08 00:00:00,3,3000)-11-20 00:00:00,2,1560) (4,Chaitali,25,Mumbai,6500,100,2009-10-08 00:00:00,3,1500) (4,Chaitali,25,Mumbai,6500,102,2009-10-08 00:00:00,3,3000) (3,kaushik,23,Kota,2000,103,2008-05-20 00:00:00,4,2060) (3,kaushik,23,Kota,2000,101,2009-11-20 00:00:00,2,1560) (3,kaushik,23,Kota,2000,100,2009-10-08 00:00:00,3,1500) (3,kaushik,23,Kota,2000,102,2009-10-08 00:00:00,3,3000) (2,Khilan,25,Delhi,1500,103,2008-05-20 00:00:00,4,2060) (2,Khilan,25,Delhi,1500,101,2009-11-20 00:00:00,2,1560) (2,Khilan,25,Delhi,1500,100,2009-10-08 00:00:00,3,1500)(2,Khilan,25,Delhi,1500,102,2009-10-08 00:00:00,3,3000) (1,Ramesh,32,Ahmedabad,2000,103,2008-05-20 00:00:00,4,2060) (1,Ramesh,32,Ahmedabad,2000,101,2009-11-20 00:00:00,2,1560) (1,Ramesh,32,Ahmedabad,2000,100,2009-10-08 00:00:00,3,1500) (1,Ramesh,32,Ahmedabad,2000,102,2009-10-08 00:00:00,3,3000)  


Pig Latin的 UNION 运算符用于合并两个关系的内容。要对两个关系执行UNION操作,它们的列和域必须相同。

语法

下面给出了 UNION 运算符的语法。

grunt> Relation_name3 = UNION Relation_name1, Relation_name2;

假设在HDFS的 /pig_data/ 目录中有两个文件,即 student_data1.txt student_data2.txt ,如下所示。

Student_data1.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata003,Rajesh,Khanna,9848022339,Delhi004,Preethi,Agarwal,9848022330,Pune005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai.

Student_data2.txt

7,Komal,Nayak,9848022334,trivendram.8,Bharathi,Nambiayar,9848022333,Chennai.

将这两个文件加载到Pig中,通过关系 student1 student2 ,如下所示。

grunt> student1 = LOAD 'hdfs://localhost:9000/pig_data/student_data1.txt' USING PigStorage(',')    as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);  grunt> student2 = LOAD 'hdfs://localhost:9000/pig_data/student_data2.txt' USING PigStorage(',')    as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);

现在,让我们使用 UNION 运算符合并这两个关系的内容,如下所示。

grunt> student = UNION student1, student2;

验证

使用 DUMP 运算子验证关系student,如下所示。

grunt> Dump student; 

输出

它将显示以下输出,显示关系student的内容。

(1,Rajiv,Reddy,9848022337,Hyderabad) (2,siddarth,Battacharya,9848022338,Kolkata)(3,Rajesh,Khanna,9848022339,Delhi)(4,Preethi,Agarwal,9848022330,Pune) (5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)(6,Archana,Mishra,9848022335,Chennai) (7,Komal,Nayak,9848022334,trivendram) (8,Bharathi,Nambiayar,9848022333,Chennai)


SPLIT 运算符用于将关系拆分为两个或多个关系。

语法

下面给出了 SPLIT 运算符的语法。

grunt> SPLIT Relation1_name INTO Relation2_name IF (condition1), Relation2_name (condition2),

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

通过关系 student_details 将此文件加载到Pig中,如下所示。

student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray); 

现在,让我们将关系分为两个,一个列出年龄小于23岁的员工,另一个列出年龄在22到25岁之间的员工。

SPLIT student_details into student_details1 if age<23, student_details2 if (22<age and age>25);

验证

使用 DUMP 操作符验证关系 student_details1 student_details2 ,如下所示。

grunt> Dump student_details1;  grunt> Dump student_details2; 

输出

它将产生以下输出,分别显示关系 student_details1 student_details2 的内容。

grunt> Dump student_details1; (1,Rajiv,Reddy,21,9848022337,Hyderabad) (2,siddarth,Battacharya,22,9848022338,Kolkata)(3,Rajesh,Khanna,22,9848022339,Delhi) (4,Preethi,Agarwal,21,9848022330,Pune)  grunt> Dump student_details2; (5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar) (6,Archana,Mishra,23,9848022335,Chennai) (7,Komal,Nayak,24,9848022334,trivendram) (8,Bharathi,Nambiayar,24,9848022333,Chennai)


FILTER 运算符用于根据条件从关系中选择所需的元组。

语法

下面给出了 FILTER 运算符的语法。

grunt> Relation2_name = FILTER Relation1_name BY (condition);

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

将此文件通过关系 student_details 加载到 的Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, age:int, phone:chararray, city:chararray);

现在使用Filter运算符来获取属于Chennai城市的学生的详细信息。

filter_data = FILTER student_details BY city == 'Chennai';

验证

使用 DUMP 运算符验证关系 filter_data ,如下所示。

grunt> Dump filter_data;

输出

它将产生以下输出,显示关系 filter_data 的内容如下。

(6,Archana,Mishra,23,9848022335,Chennai)(8,Bharathi,Nambiayar,24,9848022333,Chennai)


DISTINCT 运算符用于从关系中删除冗余(重复)元组。

语法

下面给出了 DISTINCT 运算符的语法。

grunt> Relation_name2 = DISTINCT Relatin_name1;

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,9848022337,Hyderabad002,siddarth,Battacharya,9848022338,Kolkata 002,siddarth,Battacharya,9848022338,Kolkata 003,Rajesh,Khanna,9848022339,Delhi 003,Rajesh,Khanna,9848022339,Delhi 004,Preethi,Agarwal,9848022330,Pune 005,Trupthi,Mohanthy,9848022336,Bhuwaneshwar006,Archana,Mishra,9848022335,Chennai 006,Archana,Mishra,9848022335,Chennai

通过关系 student_details 将此文件加载到Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')    as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);

现在,让我们使用 DISTINCT 运算符从 student_details 关系中删除冗余(重复)元组,并将其另存在一个名为 distinct_data 的关系 如下所示。

grunt> distinct_data = DISTINCT student_details;

验证

使用 DUMP 运算符验证关系 distinct_data ,如下所示。

grunt> Dump distinct_data;

输出

它将产生以下输出,显示关系 distinct_data 的内容如下。

(1,Rajiv,Reddy,9848022337,Hyderabad)(2,siddarth,Battacharya,9848022338,Kolkata) (3,Rajesh,Khanna,9848022339,Delhi) (4,Preethi,Agarwal,9848022330,Pune) (5,Trupthi,Mohanthy,9848022336,Bhuwaneshwar)(6,Archana,Mishra,9848022335,Chennai)


FOREACH 运算符用于基于列数据生成指定的数据转换。

语法

下面给出了 FOREACH 运算符的语法。

grunt> Relation_name2 = FOREACH Relatin_name1 GENERATE (required data);

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

通过关系 student_details 将此文件加载到的Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在让我们从关系 student_details 中获取每个学生的id,age和city值,并使用 foreach 运算符将它存储到另一个名为 foreach_data  关系,如下所示。

grunt> foreach_data = FOREACH student_details GENERATE id,age,city;

验证

使用 DUMP 运算符验证关系 foreach_data ,如下所示。

grunt> Dump foreach_data;

输出

它将产生以下输出,显示关系 foreach_data 的内容。

(1,21,Hyderabad)(2,22,Kolkata)(3,22,Delhi)(4,21,Pune) (5,23,Bhuwaneshwar)(6,23,Chennai) (7,24,trivendram)(8,24,Chennai) 


ORDER BY 运算符用于以基于一个或多个字段的排序顺序显示关系的内容。

语法

下面给出了 ORDER BY 运算符的语法。

grunt> Relation_name2 = ORDER Relatin_name1 BY (ASC|DESC);

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

通过关系 student_details 将此文件加载到的Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在让我们根据学生的年龄以降序排列关系,并使用 ORDER BY 运算符将它存储到另一个名为 order_by_data 的关系中,如下所示。

grunt> order_by_data = ORDER student_details BY age DESC;

验证

使用 DUMP 运算符验证关系 order_by_data ,如下所示。

grunt> Dump order_by_data; 

输出

它将产生以下输出,显示关系 order_by_data 的内容。

(8,Bharathi,Nambiayar,24,9848022333,Chennai)(7,Komal,Nayak,24,9848022334,trivendram)(6,Archana,Mishra,23,9848022335,Chennai) (5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar)(3,Rajesh,Khanna,22,9848022339,Delhi) (2,siddarth,Battacharya,22,9848022338,Kolkata)(4,Preethi,Agarwal,21,9848022330,Pune) (1,Rajiv,Reddy,21,9848022337,Hyderabad)


LIMIT 运算符用于从关系中获取有限数量的元组。

语法

下面给出了 LIMIT 运算符的语法。

grunt> Result = LIMIT Relation_name required number of tuples;

假设在HDFS目录 /pig_data/ 中有一个名为 student_details.txt 的文件,如下所示。

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

通过关系 student_details 将此文件加载到的Pig中,如下所示。

grunt> student_details = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray,age:int, phone:chararray, city:chararray);

现在,让我们根据学生的年龄以降序对关系进行排序,并使用 ORDER BY 运算符将其存储到另一个名为 limit_data 的关系中,如下所示。

grunt> limit_data = LIMIT student_details 4; 

验证

使用 DUMP 运算符验证关系 limit_data ,如下所示。

grunt> Dump limit_data; 

输出

它将产生以下输出,显示关系 limit_data 的内容如下。

(1,Rajiv,Reddy,21,9848022337,Hyderabad) (2,siddarth,Battacharya,22,9848022338,Kolkata) (3,Rajesh,Khanna,22,9848022339,Delhi) (4,Preethi,Agarwal,21,9848022330,Pune) 


Apache Pig提供了各种内置函数,即 eval,load,store,math,string,bag tuple 函数。

Eval函数

下面给出了Apache Pig提供的 eval 函数列表。

S.N.函数 & 描述
1AVG()

计算包内数值的平均值。

2BagToString()

将包的元素连接成字符串。在连接时,我们可以在这些值之间放置分隔符(可选)。

3CONCAT()

连接两个或多个相同类型的表达式。

4COUNT()

获取包中元素的数量,同时计算包中元组的数量。

5COUNT_STAR()

它类似于 COUNT() 函数。 它用于获取包中的元素数量。

6DIFF()

比较元组中的两个包(字段)。

7IsEmpty()

检查包或映射是否为空。

8MAX()

计算单列包中的列(数值或字符)的最大值。

9MIN()

要获取单列包中特定列的最小(最低)值(数字或字符)。

10PluckTuple()

使用Pig Latin的  PluckTuple() 函数,可以定义字符串Prefix,并过滤以给定prefix开头的关系中的列。

11SIZE()

基于任何Pig数据类型计算元素的数量。

12SUBTRACT()

两个包相减, 它需要两个包作为输入,并返回包含第一个包中不在第二个包中的元组的包。

13SUM()

要获取单列包中某列的数值总和。

14TOKENIZE()

要在单个元组中拆分字符串(其中包含一组字),并返回包含拆分操作的输出的包。


Apache Pig中的加载存储函数用于确定数据如何从Pig中弹出。这些函数与加载和存储运算符一起使用。下面给出了Pig中可用的加载和存储函数的列表。

S.N.函数 & 描述
1PigStorage()

加载和存储结构化文件。

2TextLoader()

将非结构化数据加载到Pig中。

3BinStorage()

使用机器可读格式将数据加载并存储到Pig中。

4Handling Compression

在Pig Latin中,我们可以加载和存储压缩数据。


下面给出了Bag和Tuple函数的列表。

S.N.函数 & 描述
1TOBAG()

将两个或多个表达式转换为包。

2TOP()

获取关系的顶部 N 个元组。

3TOTUPLE()

将一个或多个表达式转换为元组。

4TOMAP()

将key-value对转换为Map。


在Apache Pig中有以下String函数。

S.N.函数 & 描述
1ENDSWITH(string, testAgainst)

验证给定字符串是否以特定子字符串结尾。

2STARTSWITH(string, substring)

接受两个字符串参数,并验证第一个字符串是否以第二个字符串开头。

3SUBSTRING(string, startIndex, stopIndex)

返回来自给定字符串的子字符串。

4EqualsIgnoreCase(string1, string2)

比较两个字符串,忽略大小写。

5INDEXOF(string, ‘character’, startIndex)

返回字符串中第一个出现的字符,从开始索引向前搜索。

6LAST_INDEX_OF(expression)

返回字符串中最后一次出现的字符的索引,从开始索引向后搜索。

7LCFIRST(expression)

将字符串中的第一个字符转换为小写。

8UCFIRST(expression)

返回一个字符串,其中第一个字符转换为大写。

9UPPER(expression)

返回转换为大写的字符串。

10LOWER(expression)

将字符串中的所有字符转换为小写。

11REPLACE(string, ‘oldChar’, ‘newChar’);

使用新字符替换字符串中的现有字符。

12STRSPLIT(string, regex, limit)

围绕给定正则表达式的匹配拆分字符串。

13STRSPLITTOBAG(string, regex, limit)

STRSPLIT() 函数类似,它通过给定的分隔符将字符串拆分,并将结果返回到包中。

14TRIM(expression)

返回删除了前端和尾部空格的字符串的副本。

15LTRIM(expression)

返回删除了前端空格的字符串的副本。

16RTRIM(expression)

返回已删除尾部空格的字符串的副本。


Apache Pig提供以下日期和时间函数 -

S.N.函数 & 描述
1ToDate(milliseconds)

此函数根据给定的参数返回日期时间对象。此函数的另一个替代方法是ToDate(iosstring),ToDate(userstring,format),ToDate(userstring,format,timezone)

2CurrentTime()

返回当前时间的日期时间对象。

3GetDay(datetime)

从日期时间对象返回一个月中的某一天。

4GetHour(datetime)

从日期时间对象返回一天中的小时。

5GetMilliSecond(datetime)

从日期时间对象返回秒中的毫秒。

6GetMinute(datetime)

从日期时间对象返回一小时中的分钟。

7GetMonth(datetime)

从日期时间对象返回一年中的月份。

8GetSecond(datetime)

从日期时间对象返回一分钟的秒。

9GetWeek(datetime)

从日期时间对象返回一年中的周。

10GetWeekYear(datetime)

从日期时间对象返回周年。

11GetYear(datetime)

从日期时间对象返回年份。

12AddDuration(datetime, duration)

返回日期时间对象的结果以及持续时间对象。

13SubtractDuration(datetime, duration)

从Date-Time对象中减去Duration对象并返回结果。

14DaysBetween(datetime1, datetime2)

返回两个日期时间对象之间的天数。

15HoursBetween(datetime1, datetime2)

返回两个日期时间对象之间的小时数。

16MilliSecondsBetween(datetime1, datetime2)

返回两个日期时间对象之间的毫秒数。

17MinutesBetween(datetime1, datetime2)

返回两个日期时间对象之间的分钟数。

18MonthsBetween(datetime1, datetime2)

返回两个日期时间对象之间的月数。

19SecondsBetween(datetime1, datetime2)

返回两个日期时间对象之间的秒数。

20WeeksBetween(datetime1, datetime2)

返回两个日期时间对象之间的周数。

21YearsBetween(datetime1, datetime2)

返回两个日期时间对象之间的年数。


我们在Apache Pig中有以下Math(数学)函数:

S.N.函数 & 描述
1ABS(expression)

获取表达式的绝对值。

2ACOS(expression)

获得表达式的反余弦值。

3ASIN(expression)

获取表达式的反正弦值。

4ATAN(expression)

此函数用于获取表达式的反正切值。

5CBRT(expression)

此函数用于获取表达式的立方根。

6CEIL(expression)

此函数用于获取向上舍入到最接近的整数的表达式的值(近1取整)。

7COS(expression)

此函数用于获取表达式的三角余弦值。

8COSH(expression)

此函数用于获取表达式的双曲余弦值。

9EXP(expression)

此函数用于获得欧拉数e乘以x的幂,即指数。

10FLOOR(expression)

要获得向下取整为最接近整数的表达式的值(四舍五入取整)。

11LOG(expression)

获得表达式的自然对数(基于e)。

12LOG10(expression)

得到表达式的基于10的对数。

13RANDOM( )

获得大于或等于0.0且小于1.0的伪随机数(double类型)。

14ROUND(expression)

要将表达式的值四舍五入为整数(如果结果类型为float)或四舍五入为长整型(如果结果类型为double)。

15SIN(expression)

获得表达式的正弦值。

16SINH(expression)

获得表达式的双曲正弦值。

17SQRT(expression)

获得表达式的正平方根。

18TAN(expression)

获得角度的三角正切。

19TANH(expression)

获得表达式的双曲正切。


除了内置函数之外,Apache Pig还为 User Defined Function(UDF:用户定义函数)提供广泛的支持。使用这些UDF,可以定义我们自己的函数并使用它们。UDF支持六种编程语言,即Java,Jython,Python,JavaScript,Ruby和Groovy。

对于编写UDF,在Java中提供全面的支持,并在所有其他语言中提供有限的支持。使用Java,你可以编写涉及处理的所有部分的UDF,如数据加载/存储,列转换和聚合。由于Apache Pig是用Java编写的,因此与其他语言相比,使用Java语言编写的UDF工作效率更高。

在Apache Pig中,我们还有一个用于UDF名为 Piggybank 的Java存储库。使用Piggybank,我们可以访问由其他用户编写的Java UDF,并贡献我们自己的UDF。

Java中的UDF的类型

在使用Java编写UDF时,我们可以创建和使用以下三种类型的函数

  • Filter函数 - Filter(过滤)函数用作过滤器语句中的条件。这些函数接受Pig值作为输入并返回布尔值。

  • Eval函数 - Eval函数在FOREACH-GENERATE语句中使用。这些函数接受Pig值作为输入并返回Pig结果。

  • Algebraic函数 - Algebraic(代数)函数对FOREACHGENERATE语句中的内包起作用。这些函数用于对内包执行完全MapReduce操作。

使用Java编写UDF

要使用Java编写UDF,我们必须集成jar文件 Pig-0.15.0.jar 在本章节中,将讨论如何使用Eclipse编写示例UDF。在继续学习前,请确保你已在系统中安装了Eclipse和Maven。

按照下面给出的步骤写一个UDF函数:

  • 打开Eclipse并创建一个新项目(例如 myproject )。

  • 将新创建的项目转换为Maven项目。

  • 在pom.xml中复制以下内容。此文件包含Apache Pig和Hadoop-core jar文件的Maven依赖关系。

<project xmlns = "http://maven.apache.org/POM/4.0.0"   xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"   xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0http://maven.apache .org/xsd/maven-4.0.0.xsd"> 	   <modelVersion>4.0.0</modelVersion>    <groupId>Pig_Udf</groupId>    <artifactId>Pig_Udf</artifactId>    <version>0.0.1-SNAPSHOT</version>	   <build>          <sourceDirectory>src</sourceDirectory>          <plugins>               <plugin>                    <artifactId>maven-compiler-plugin</artifactId>                    <version>3.3</version>                    <configuration>                         <source>1.7</source>                         <target>1.7</target>                    </configuration>               </plugin>          </plugins>     </build>	   <dependencies> 	      <dependency>                     <groupId>org.apache.pig</groupId>                     <artifactId>pig</artifactId>                     <version>0.15.0</version>           </dependency> 		      <dependency>                 <groupId>org.apache.hadoop</groupId>                     <artifactId>hadoop-core</artifactId>                     <version>0.20.2</version>           </dependency>          </dependencies>  	</project>
  • 保存文件并刷新它。 Maven依赖关系部分中,可以找到下载的jar文件。

  • 创建名为 Sample_Eval 的新的类文件,并在其中复制以下内容。

import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple;  import java.io.IOException; import org.apache.pig.EvalFunc; import org.apache.pig.data.Tuple;public class Sample_Eval extends EvalFunc<String>{    public String exec(Tuple input) throws IOException {         if (input == null || input.size() == 0)            return null;            String str = (String)input.get(0);            return str.toUpperCase();     } }

在编写UDF时,必须继承EvalFunc类并向 exec() 函数提供实现。在此函数中,写入UDF所需的代码。在上面的例子中,我们返回了将给定列的内容转换为大写的代码。

  • 编译完类并确认没有错误后,右键单击Sample_Eval.java文件。它将呈现一个菜单。选择“export”,如以下屏幕截图所示。

选择export
  • 点击“export”,将看到以下窗口。 点击“JAR file”

点击Export
  • 点击“Next>”按钮继续操作。将获得另一个窗口,你需要在本地文件系统中输入路径,在其中存储jar文件。

jar export
  • 最后,单击“Finish”按钮。在指定的文件夹中,创建一个Jar文件 sample_udf.jar 此jar文件包含用Java编写的UDF。

使用UDF

在编写UDF和生成Jar文件后,请按照下面给出的步骤:

步骤1:注册Jar文件

在写入UDF(在Java中)后,我们必须使用Register运算符注册包含UDF的Jar文件。通过注册Jar文件,用户可以将UDF的位置绑定到Apache Pig。

语法

下面给出了Register运算符的语法。

REGISTER path; 

让我们注册本章前面创建的sample_udf.jar。以本地模式启动Apache Pig并注册jar文件sample_udf.jar,如下所示。

$cd PIG_HOME/bin $./pig –x local REGISTER '/$PIG_HOME/sample_udf.jar'

注意:假设路径中的Jar文件:/$PIG_HOME/sample_udf.jar

步骤2:定义别名

注册UDF后,可以使用 Define 运算符为其定义一个别名。

语法

下面给出了Define运算符的语法。

DEFINE alias {function | [`command` [input] [output] [ship] [cache] [stderr] ] }; 

定义sample_eval的别名,如下所示。

DEFINE sample_eval sample_eval();

步骤3:使用UDF

定义别名后,可以使用与内置函数相同的UDF。假设在HDFS /Pig_Data/ 目录中有一个名为emp_data的文件,其中包含以下内容。

001,Robin,22,newyork002,BOB,23,Kolkata003,Maya,23,Tokyo004,Sara,25,London 005,David,23,Bhuwaneshwar 006,Maggy,22,Chennai007,Robert,22,newyork008,Syam,23,Kolkata009,Mary,25,Tokyo010,Saran,25,London 011,Stacy,25,Bhuwaneshwar 012,Kelly,22,Chennai

并假设我们已将此文件加载到Pig中,如下所示。

grunt> emp_data = LOAD 'hdfs://localhost:9000/pig_data/emp1.txt' USING PigStorage(',')   as (id:int, name:chararray, age:int, city:chararray);

现在使用UDF sample_eval 将员工的姓名转换为大写。

grunt> Upper_case = FOREACH emp_data GENERATE sample_eval(name);

请验证关系 Upper_case 的内容,如下所示。

grunt> Dump Upper_case;  (ROBIN)(BOB)(MAYA)(SARA)(DAVID)(MAGGY)(ROBERT)(SYAM)(MARY)(SARAN)(STACY)(KELLY)


在本章中,我们将了解如何以批处理模式运行Apache Pig脚本。

Pig脚本中的注释

在将脚本写入文件时,我们可以在其中包含注释,如下所示。

多行注释

我们将用'/*'开始多行注释,以'*/'结束。

/* These are the multi-line comments   In the pig script */ 

单行注释

我们将用“--"开始单行注释。

--we can write single line comments like this.

在批处理模式下执行Pig脚本

在以批处理方式执行Apache Pig语句时,请按照以下步骤操作。

步骤1

将所有需要的Pig Latin语句写在单个文件中。我们可以将所有Pig Latin语句和命令写入单个文件,并将其另存为 .pig 文件。

步骤2

执行Apache Pig脚本。你可以从shell(Linux)执行Pig脚本,如下所示。

Local模式MapReduce模式

$ pig -x local Sample_script.pig

$ pig -x mapreduce Sample_script.pig

你可以使用exec命令从Grunt shell执行它,如下所示。

grunt> exec /sample_script.pig

从HDFS执行Pig脚本

我们还可以执行驻留在HDFS中的Pig脚本。假设在名为 /pig_data/ 的HDFS目录中有名为 Sample_script.pig 的Pig脚本。我们可以执行它如下所示。

$ pig -x mapreduce hdfs://localhost:9000/pig_data/Sample_script.pig 

假设在HDFS中有一个具有以下内容的文件 student_details.txt

student_details.txt

001,Rajiv,Reddy,21,9848022337,Hyderabad 002,siddarth,Battacharya,22,9848022338,Kolkata003,Rajesh,Khanna,22,9848022339,Delhi 004,Preethi,Agarwal,21,9848022330,Pune 005,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar 006,Archana,Mishra,23,9848022335,Chennai 007,Komal,Nayak,24,9848022334,trivendram 008,Bharathi,Nambiayar,24,9848022333,Chennai

我们还在同一个HDFS目录中有一个名为 sample_script.pig 的示例脚本。此文件包含对student关系执行操作和转换的语句,如下所示。

student = LOAD 'hdfs://localhost:9000/pig_data/student_details.txt' USING PigStorage(',')   as (id:int, firstname:chararray, lastname:chararray, phone:chararray, city:chararray);	student_order = ORDER student BY age DESC;  student_limit = LIMIT student_order 4;  Dump student_limit;
  • 脚本的第一个语句会将名为 student_details.txt 的文件中的数据加载为名为student的关系。

  • 脚本的第二个语句将根据年龄以降序排列关系的元组,并将其存储为 student_order

  • 脚本的第三个语句会将 student_order 的前4个元组存储为 student_limit

  • 最后,第四个语句将转储关系 student_limit 的内容。

现在,执行 sample_script.pig ,如下所示。

$./pig -x mapreduce hdfs://localhost:9000/pig_data/sample_script.pig

Apache Pig被执行,并提供具有以下内容的输出。

(7,Komal,Nayak,24,9848022334,trivendram)(8,Bharathi,Nambiayar,24,9848022333,Chennai) (5,Trupthi,Mohanthy,23,9848022336,Bhuwaneshwar) (6,Archana,Mishra,23,9848022335,Chennai)2015-10-19 10:31:27,446 [main] INFO  org.apache.pig.Main - Pig script completed in 12minutes, 32 seconds and 751 milliseconds (752751 ms)


以下资源包含有关Apache Pig的其他信息。请使用它们获得有关此主题的更深入的知识。

Apache Pig上的有用链接

有用的Apache Pig书籍

  • Programming Pig
  • Hadoop For Dummies
  • Pig & Sqoop Refresher

Apache Pig是MapReduce的一个抽象。它是一个工具/平台,用于分析较大的数据集,将它们表示为数据流。Pig通常与 Hadoop 一起使用;我们可以使用Pig在Hadoop中执行所有的数据操作操作。