进击大数据系列(十二)Hadoop 数据同步工具 Sqoop

前面介绍了前面介绍了 Hadoop 基本概念与生态、安装(HDFS+YARN+MapReduce)实战操作、常用命令、架构基石 HDFS、统一资源管理和调度平台 YARN、分布式计算框架 MapReduce 、数据仓库 Hive、计...

前面介绍了前面介绍了 Hadoop 基本概念与生态安装(HDFS+YARN+MapReduce)实战操作常用命令架构基石 HDFS统一资源管理和调度平台 YARN分布式计算框架 MapReduce 、数据仓库 Hive计算引擎 Spark实时计算流计算引擎 Flink 数据库 HbaseHadoop 任务调度框架 Oozie等相关的知识点,天我将详细的为大家介绍 大数据 Hadoop 数据同步工具 Sqoop 相关知识。

Sqoop 概述

传统的应用程序管理系统,即应用程序与使用 RDBMS 的关系数据库的交互,是产生大数据的来源之一。由 RDBMS 生成的这种大数据存储在关系数据库结构中的关系数据库服务器中。

当大数据存储和 Hadoop 生态系统的 MapReduce,Hive,HBase,Cassandra,Pig 等分析器出现时,他们需要一种工具来与关系数据库服务器进行交互,以导入和导出驻留在其中的数据。在这里,Sqoop 在 Hadoop 生态系统中占据一席之地,以便在关系数据库服务器和 Hadoop 的 HDFS 之间提供可行的交互。

attachments-2023-09-gVW4oxkG65151fddcac8d,png

Sqoop 是一个用于在 Hadoop 和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到 Hadoop HDFS,并从Hadoop 文件系统导出到关系数据库。它由 Apache 软件基金会提供。

  • 官网:http://sqoop.apache.org/

sqoop2 比 sqoop1 的改进

  • 引入 sqoop server,集中化管理 connector 等
  • 多种访问方式:CLI,Web UI,REST API
  • 引入基于角色 的安全机制

基本架构如下

attachments-2023-09-Qlq0XZ2465151ffab2076,png

1.x 的架构

attachments-2023-09-B6RcwqAY6515202ad5b2a,png

需要注意的是Sqoop的1.x 和 2.x 的版本差异比较大,但是1.99算是2.x版本的,所以在使用的时候需要注意。

attachments-2023-09-6M4kBZfL6515205e90fb2,png

Sqoop 如何工作

下图描述了Sqoop的工作流程。

attachments-2023-09-haSu3Xqv651520718dbc1,png

Sqoop导入

导入工具从RDBMS向HDFS导入单独的表。表中的每一行都被视为HDFS中的记录。所有记录都以文本文件的形式存储在文本文件中或作为Avro和Sequence文件中的二进制数据存储。

attachments-2023-09-Fe3kzGrn6515208a165bf,png

  • sqoop会通过jdbc来获取需要的数据库的元数据信息,例如:导入的表的列名,数据类型。
  • 这些数据库的数据类型会被映射成为java的数据类型,根据这些信息,sqoop会生成一个与表名相同的类用来完成序列化工作,保存表中的每一行记录。
  • sqoop开启MapReduce作业
  • 启动的作业在input的过程中,会通过jdbc读取数据表中的内容,这时,会使用sqoop生成的类进行序列化。
  • 最后将这些记录写到hdfs上,在写入hdfs的过程中,同样会使用sqoop生成的类进行反序列化。

Sqoop导出

导出工具将一组文件从HDFS导出回RDBMS。给Sqoop输入的文件包含记录,这些记录在表中被称为行。这些被读取并解析成一组记录并用用户指定的分隔符分隔。

attachments-2023-09-h9fLEcco651520a253807,png

  • 首先sqoop通过jdbc访问关系型数据库获取需要导出的信息的元数据信息
  • 根据获取的元数据信息,sqoop生成一个Java类,用来承载数据的传输,该类必须实现序列化
  • 启动MapReduce程序
  • sqoop利用生成的这个类,并行从hdfs中获取数据
  • 每个map作业都会根据读取到的导出表的元数据信息和读取到的数据,生成一批insert 语句,然后多个map作业会并行的向MySQL中插入数据。
  • Sqoop 安装

    由于Sqoop是Hadoop的子项目,因此它只能在Linux操作系统上运行。按照以下步骤在您的系统上安装Sqoop。

    安装java Hadoop

    java和Hadoop的安装就不介绍了。

    下载Sqoop

    我们可以从以下:http://archive.apache.org/dist/sqoop/下载最新版本的Sqoop 对于本教程,我们使用1.4.7版本,即sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz ,其实我们看到sqoop 已经很久没更新了,其实这主要两方面原因:

    • sqoop已经很稳定了
    • sqoop太老了,已经没有多少人在用了
    • attachments-2023-09-PfNSngkc651520c571d0e,png

安装Sqoop

以下命令用于提取sqoop的压缩包移动到/usr/local目录下并解压。

sudo cp ~/Downloads/sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz /usr/local/
tar xvf sqoop-1.4.7.bin__hadoop-2.6.0.tar.gz

解压完成后配置环境变量,可以通过在~/.bash_profile文件中添加以下行来设置Sqoop环境,注意不同的shell 可能不一样,但是/etc/profile肯定是可以的。

vim ~/.bash_profile
# 添加环境变量
export SQOOP_HOME=/usr/local/sqoop-1.4.7.bin__hadoop-2.6.0

刷新环境变量source ~/.bash_profile,我们可以输入sqoop 后按tab 键,发现我们的环境变量已经生效了

attachments-2023-09-AJe2y8kv651520f768e68,png

配置Sqoop

为了能使得Sqoop正常工作,例如相关数据我们还需要配置大数据的相关环境,Sqoop同步原理是通过MR 来实现的,所以需要编辑sqoop-env.sh文件,该文件被放置在$ SQOOP_HOME/conf目录。首先,重定向到Sqoop config目录并使用以下命令复制模板文件。

cd $SQOOP_HOME/conf  
mv sqoop-env-template.sh sqoop-env.sh

打开sqoop-env.sh并编辑以下行

export HADOOP_COMMON_HOME=/usr/local/hadoop  
export HADOOP_MAPRED_HOME=/usr/local/hadoop

下载并配置mysql-connector-java

我们从以下maven 仓库下载该jar https://mvnrepository.com/artifact/mysql/mysql-connector-java/5.1.49,然后 使用下面的命令移动jar 包到$SQOOP_HOME/lib/

cp ~/Downloads/mysql-connector-java-5.1.49.jar $SQOOP\_HOME/lib/

验证Sqoop

以下命令用于验证Sqoop版本。

sqoop-version

#输出
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]  
2022-09-30 14:59:15,766 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7  
Sqoop 1.4.7  
git commit id 2328971411f57f0cb683dfb79d19d4d19d185dd8  
Compiled by maugli on Thu Dec 21 15:59:58 STD 2017

Sqoop安装完成,其实这里有一些告警信息,暂时不管,后面有问题再处理

Warning: /usr/local/sqoop-1.4.7.bin__hadoop-2.6.0/../hcatalog does not exist! HCatalog jobs will fail.
Please set $HCAT_HOME to the root of your HCatalog installation.
Warning: /usr/local/sqoop-1.4.7.bin__hadoop-2.6.0/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.

Sqoop Import

下面绍如何将数据从MySQL数据库导入到Hadoop HDFS。Import从RDBMS将单个表导入HDFS。表中的每一行都被视为HDFS中的记录。所有记录均以文本数据的形式存储在文本文件中,或作为Avro和Sequence文件中的二进制数据存储。

数据准备

以下语法用于将数据导入HDFS。

$ sqoop import (generic-args) (import-args)

我们有一个world 库,里面有个表叫做city,数据如下:

attachments-2023-09-P9HijAXV65152122129a1,png

导入数据

Sqoop导入工具用于将表格数据从表格导入到Hadoop文件系统,作为文本文件或二进制文件。这里我们先尝试最小命令,也就是那些必须的参数,然后我们看看为了满足我们的目的可以使用哪些参数。

以下命令用于将world 库中的city表从MySQL数据库服务器导入到HDFS。

sqoop import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--table city \
--m 1

如果它成功执行,则会得到以下输出。

2022-09-30 16:46:40,311 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1664527475421_0001/
2022-09-30 16:46:40,312 INFO mapreduce.Job: Running job: job_1664527475421_0001
2022-09-30 16:46:48,463 INFO mapreduce.Job: Job job_1664527475421_0001 running in uber mode : false
2022-09-30 16:46:48,464 INFO mapreduce.Job:  map 0% reduce 0%
2022-09-30 16:46:53,529 INFO mapreduce.Job:  map 100% reduce 0%
2022-09-30 16:46:53,539 INFO mapreduce.Job: Job job_1664527475421_0001 completed successfully
2022-09-30 16:46:53,663 INFO mapreduce.Job: Counters: 31
  File System Counters
    FILE: Number of bytes read=0
    FILE: Number of bytes written=235070
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=87
    HDFS: Number of bytes written=144481
    HDFS: Number of read operations=6
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    HDFS: Number of bytes read erasure-coded=0
  Job Counters
    Launched map tasks=1
    Other local map tasks=1
    Total time spent by all maps in occupied slots (ms)=2420
    Total time spent by all reduces in occupied slots (ms)=0
    Total time spent by all map tasks (ms)=2420
    Total vcore-milliseconds taken by all map tasks=2420
    Total megabyte-milliseconds taken by all map tasks=2478080
  Map-Reduce Framework
    Map input records=4079
    Map output records=4079
    Input split bytes=87
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=32
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=317718528
  File Input Format Counters
    Bytes Read=0
  File Output Format Counters
    Bytes Written=144481
2022-09-30 16:46:53,667 INFO mapreduce.ImportJobBase: Transferred 141.0947 KB in 17.0677 seconds (8.2668 KB/sec)
2022-09-30 16:46:53,670 INFO mapreduce.ImportJobBase: Retrieved 4079 records.

我们可以看到这是一个没有reduce 的MR,我们输入输出数据是4079条。

Map input records=4079  
Map output records=4079

其实上面我们并没有指定要将数据同步到HDFS 上的那个目录下去,那其实默认是在/user/XXX/city,其中 XXX 是用户名,city 是表名。

attachments-2023-09-VGS4mSMS651521427f9ad,png

导入目标目录

我们可以在使用Sqoop将表中数据导入HDFS时指定目标目录。以下是将目标目录指定为Sqoop导入命令的选项的语法。

--target-dir <new or exist directory in HDFS>

以下命令用于将emp_add表数据导入到’/ queryresult’目录中。

sqoop import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--table city \
--m 1 \
--target-dir /tmp/city

我们可以看到数据已经在/tmp/city目录中了

attachments-2023-09-PGZL5saI6515215be3a46,png

需要注意的是我们要保证/tmp/city目录不存在,否则就有下面的报错(这里演示的时候使用的是/tmp)

 Import failed: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://kingcall:9000/tmp already exists
  at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java

attachments-2023-09-W3OXSmWY651521a192b79,png

解决这个问题其实很简单,只需要添加一个选项--delete-target-dir即可

sqoop import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--table city \
--delete-target-dir \
--m 1 

# 或者
sqoop import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--table city \
--m 1 \
--delete-target-dir \
--target-dir /tmp/city

导入数据的子集

我们可以使用Sqoop的时候再where子句中加上一个过滤条件从而只导入我们想要的一部分,例如每天的增量数据。它在相应的数据库服务器中执行相应的SQL查询,并将结果存储在HDFS中的目标目录中。

where子句的语法如下。

--where <condition>

以下命令用于导入city表数据的子集。这里导入人口大于100万册城市

sqoop import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--table city \
--m 1 \
--where "Population>=1000000" \
--target-dir /tmp/city

我们明显看到数据文件小了很多

attachments-2023-09-qBHqj5WP651521d5067db,png

日志中也显示只有238条数据

Map-Reduce Framework
  Map input records=238
  Map output records=238
  Input split bytes=87
  Spilled Records=0
  Failed Shuffles=0
  Merged Map outputs=0
  GC time elapsed (ms)=29
  CPU time spent (ms)=0
  Physical memory (bytes) snapshot=0
  Virtual memory (bytes) snapshot=0
  Total committed heap usage (bytes)=320864256
File Input Format Counters
  Bytes Read=0
File Output Format Counters
  Bytes Written=8481

增量导入

增量导入是一种仅导入表中新添加的行的技术。需要添加’incremental’,'check-column’和’last-value’选项来执行增量导入。

以下语法用于Sqoop导入命令中的增量选项。

--incremental <mode>
--check-column <column name>
--last value <last check column value>

我们的city 表是自增主键,我们假设上次把前4000条都同步进来了,那么我们的last-value就是4000,其实我们知道总共有4079条数据,那么这次导入的应该是79条数据。

sqoop import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--table city \
--m 1 \
--incremental append \
--check-column id \
--last-value 4000 \
--target-dir /tmp/city

首先我们看到数据同步进来了,而且文件小了很多,那是因为只有79条数据

attachments-2023-09-HCGSlwGy65152205611c8,png

关于是否只有79条,我们可以看日志,Map input records=79说明我们的假设没有问题

2022-09-30 17:13:15,148 INFO mapreduce.Job: Running job: job_1664527475421_0004
2022-09-30 17:13:21,254 INFO mapreduce.Job: Job job_1664527475421_0004 running in uber mode : false
2022-09-30 17:13:21,255 INFO mapreduce.Job:  map 0% reduce 0%
2022-09-30 17:13:25,297 INFO mapreduce.Job:  map 100% reduce 0%
2022-09-30 17:13:25,304 INFO mapreduce.Job: Job job_1664527475421_0004 completed successfully
2022-09-30 17:13:25,394 INFO mapreduce.Job: Counters: 31
  File System Counters
    FILE: Number of bytes read=0
    FILE: Number of bytes written=235302
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=87
    HDFS: Number of bytes written=2752
    HDFS: Number of read operations=6
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    HDFS: Number of bytes read erasure-coded=0
  Job Counters
    Launched map tasks=1
    Other local map tasks=1
    Total time spent by all maps in occupied slots (ms)=1918
    Total time spent by all reduces in occupied slots (ms)=0
    Total time spent by all map tasks (ms)=1918
    Total vcore-milliseconds taken by all map tasks=1918
    Total megabyte-milliseconds taken by all map tasks=1964032
  Map-Reduce Framework
    Map input records=79
    Map output records=79
    Input split bytes=87
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=33
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=327155712
  File Input Format Counters
    Bytes Read=0
  File Output Format Counters
    Bytes Written=2752
2022-09-30 17:13:25,401 INFO mapreduce.ImportJobBase: Transferred 2.6875 KB in 11.6835 seconds (235.5467 bytes/sec)
2022-09-30 17:13:25,403 INFO mapreduce.ImportJobBase: Retrieved 79 records.
2022-09-30 17:13:25,418 INFO util.AppendUtils: Creating missing output directory - city
2022-09-30 17:13:25,432 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
2022-09-30 17:13:25,432 INFO tool.ImportTool:  --incremental append
2022-09-30 17:13:25,432 INFO tool.ImportTool:   --check-column id
2022-09-30 17:13:25,432 INFO tool.ImportTool:   --last-value 4079
2022-09-30 17:13:25,432 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')

而且这个日志有意思的是告诉你了下次last-value的值是4079 ,那其实我们会发现有点不合理的地方,那就是这个last-value我们每次需要去手动维护,后面我们看这个问题如何解决。

需要注意的是增量作业,就不能和--delete-target-dir选项一起使用了,这很好理解如果你都将历史数据删除了(已经存在的),那还怎么增量呢。

查询导入(query )

上面在导入数据的子集的时候演示了通过--where选项导入部分数据,也就是满足条件的,下面我们通过--query选项也可以导入子集,只不过使用--query的时候就不能使用--table了,因为我们的数据是从--query来的,这个很好理解。

sqoop import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--delete-target-dir \
--target-dir /tmp/city \
--query "SELECT * FROM city LIMIT 5" \
--m 1

不过需要注意的是我们必须要指定--target-dir,否则报错如下:

Must specify destination with --target-dir
Try --help for usage instructions.

其实我们可以思考一下为什么,因为在使用--table的时候,我们的数据文件夹有默认的名字也就是表名字,也有默认的路径,但是我们使用--query的时候我们的数据文件夹是没有名字的,总不能搞一个随机的名字吧,既然没有默认的文件夹名字肯定就没有默认的目录了,所以需要我们指定。

下面的错误是因为--query模式必须要跟一个CONDITIONS

2022-09-30 19:56:17,771 ERROR tool.ImportTool: Import failed: java.io.IOException: Query [SELECT * FROM city LIMIT 5] must contain '$CONDITIONS' in WHERE clause.
  at org.apache.sqoop.manager.ConnManager.getColumnTypes(ConnManager.java:332)
  at org.apache.sqoop.orm.ClassWriter.getColumnTypes(ClassWriter.java:1872)
  at org.apache.sqoop.orm.ClassWriter.generate(ClassWriter.java:1671)
  at org.apache.sqoop.tool.CodeGenTool.generateORM(CodeGenTool.java:106)
  at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:501)
  at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:628)
  at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
  at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
  at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
  at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)
  at org.apache.sqoop.Sqoop.main(Sqoop.java:252)

我们稍稍改造一下即可

sqoop import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--delete-target-dir \
--target-dir /tmp/city \
--query "SELECT * FROM city  where \$CONDITIONS LIMIT 5" \
--m 1

可以看到导入了5条数据

  Map-Reduce Framework
    Map input records=5
    Map output records=5
    Input split bytes=87
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=29
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=321388544
  File Input Format Counters
    Bytes Read=0
  File Output Format Counters
    Bytes Written=153
2022-09-30 20:07:31,549 INFO mapreduce.ImportJobBase: Transferred 153 bytes in 11.6347 seconds (13.1503 bytes/sec)
2022-09-30 20:07:31,551 INFO mapreduce.ImportJobBase: Retrieved 5 records.

既然table 可以导入子集合,为什么还要query 呢,因为query 更灵活,例如我们可以在query 中完成数据类型转换,运算,增加列、删除列操作。

导入所有表

下面介绍如何将所有表从RDBMS数据库服务器导入到HDFS。每个表格数据存储在一个单独的目录中,并且目录名称与表格名称相同。

以下语法用于导入所有表。

$ sqoop import-all-tables (generic-args) (import-args) 

world 库下面有三张表

attachments-2023-09-35Wg4NEo6515222a2a0aa,png

以下命令用于从userdb数据库中导入所有表。

sqoop  import-all-tables \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 

由于我们没有指定目录,那么则会导入到当前用户的目录下, 但是我们看到报错了。

2022-09-30 17:23:21,540 INFO db.DBInputFormat: Using read commited transaction isolation
2022-09-30 17:23:21,540 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`Code`), MAX(`Code`) FROM `country`
2022-09-30 17:23:21,546 INFO mapreduce.JobSubmitter: Cleaning up the staging area /tmp/hadoop-yarn/staging/liuwenqiang/.staging/job_1664527475421_0006
2022-09-30 17:23:21,551 ERROR tool.ImportAllTablesTool: Encountered IOException running import job: java.io.IOException: Generating splits for a textual index column allowed only in case of "-Dorg.apache.sqoop.splitter.allow_text_splitter=true" property passed as a parameter

就是说我们要允许对字符串类型的索引进行split,这是什么意思呢,如果你注意的话,发现我们前面一直有个参数--m 1也就是启动只有一个Map 底MR 任务来完成抽数,由于我们这里没有指定所以Sqoop 就需要判断到底是启动几个Map 来完成抽数,判断标准就是对主键字段进行split ,但是由于我们的一个表的逐渐字段是字符串,所以导致出现了这个问题。

其实我们可以看到我们的city 表已经抽数成功了,只是在抽其他表的时候失败了。

attachments-2023-09-iNpX7Wif65152244926ee,png

我们进入city表的目录,可以看到四个数据文件,那说明我们是通过4个map 抽数过来的,这一点也可以通过下面的日志印证。

attachments-2023-09-Pz3IRoVx6515225a16aca,png

Job Counters
  Launched map tasks=4
  Other local map tasks=4
  Total time spent by all maps in occupied slots (ms)=8254
  Total time spent by all reduces in occupied slots (ms)=0
  Total time spent by all map tasks (ms)=8254
  Total vcore-milliseconds taken by all map tasks=8254
  Total megabyte-milliseconds taken by all map tasks=8452096

既然知道了原因,解决方案就很多了,例如改造主键,或者是我们直接指定使用一个map 来,这样就不用split 了。

sqoop  import-all-tables \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--m 1

数据已经被成功抽过来了

attachments-2023-09-iOJJZTEh651522ff78fe0,png

由于有三张表,所以我们看到启了三个任务,然后由于我们限制了map ,所以每个任务都只有一个map。

attachments-2023-09-Uk0HNCNX6515231d9cf3f,png

或者我们可以指定-Dorg.apache.sqoop.splitter.allow_text_splitter=true选项,允许对字符串累类型的主键进行分割,不过这个时候要求所有的表都有主键。

sqoop  import-all-tables \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 

我们看到数据已经被成功导入了

attachments-2023-09-jO3LYWGO65152345b5b52,png

总结

  • 只使用一个map
  • 有主键并且是数字类型
  • 有主键如果是字符串,则指定-Dorg.apache.sqoop.splitter.allow_text_splitter=true

Sqoop Export

下面如何将数据从HDFS导出回RDBMS数据库。目标表必须存在于目标数据库中。输入给Sqoop的文件包含记录,这些记录在表中称为行。这些被读取并解析成一组记录并用用户指定的分隔符分隔。

  • 缺省操作是使用INSERT语句将输入文件中的所有记录插入到数据库表中。
  • 在更新模式下,Sqoop生成将现有记录替换到数据库中的UPDATE语句。

以下是导出命令的语法。

sqoop export (generic-args) (export-args) 

让我们以HDFS中的文件中的员工数据为例。雇员数据在HDFS的’/tmp/emp/'目录下的数据文件中,数据如下:

1201, gopal,     manager, 50000, TP
1202, manisha,   preader, 50000, TP
1203, kalil,     php dev, 30000, AC
1204, prasanth,  php dev, 30000, AC
1205, kranthi,   admin,   20000, TP
1206, satish p,  grp des, 20000, GR

必须手动创建要导出的表,并将其导出到数据库中。以下查询用于在mysql命令行中创建表’employee’。

$ mysql
mysql> use emp;
mysql> CREATE TABLE employee ( 
   id INT NOT NULL PRIMARY KEY
   name VARCHAR(20), 
   deg VARCHAR(20),
   salary INT,
   dept VARCHAR(10));

以下命令用于将表数据(位于HDFS上的emp_data文件中)导出到Mysql数据库服务器的db数据库中的employee表中。

sqoop export \
--connect jdbc:mysql://localhost/emp \
--username root \
--password www1234 \
--table employee \ 
--export-dir /tmp/emp

以下命令用于验证mysql命令行中的表。

mysql>select * from employee;

可以看到数据已经成功导出

+------+--------------+-------------+-------------------+--------+
| Id   | Name         | Designation | Salary            | Dept   |
+------+--------------+-------------+-------------------+--------+
| 1201 | gopal        | manager     | 50000             | TP     |
| 1202 | manisha      | preader     | 50000             | TP     |
| 1203 | kalil        | php dev     | 30000             | AC     |
| 1204 | prasanth     | php dev     | 30000             | AC     |
| 1205 | kranthi      | admin       | 20000             | TP     |
| 1206 | satish p     | grp des     | 20000             | GR     |
+------+--------------+-------------+-------------------+--------+

Sqoop Job

前面我们介绍的都是直接在Sqoop 中输入一大串命令完成数据的导入导出,下面介绍如何创建和维护Sqoop作业,通过创建Sqoop作业保存导入和导出命令。

它指定参数来识别创建、调用作业。这种重新调用或重新执行主要用于增量导入,它可以将更新的行从RDBMS表导入HDFS,其实在前面我们在使用Sqoop进行增量同步的时候,是需要指定 last-value 的。但一般我们都是自动化进行数据同步的,这就需要有一个地方,能够自动记录和填充 上次增量同步的 last-value。

抛开手动维护这个last-value的繁琐,而且还很容易失败。所以Sqoop通过Job 这种方式解决了这个问题。

以下是创建Sqoop作业的语法。

qoop job (generic-args) (job-args)
   [-- [subtool-name] (subtool-args)]

创建作业(–create)

我们在这里创建一个名为myjob的作业,它可以将表数据从RDBMS表导入HDFS。以下命令用于创建将数据从db数据库中的employee表导入到HDFS文件的作业。

sqoop job --create job_import_city \
-- import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--table city \
--m 1

遇到了一个报错,这主要是因为缺少java-json.jar

2022-09-30 18:25:05,552 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
2022-09-30 18:25:05,910 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
2022-09-30 18:25:05,935 ERROR sqoop.Sqoop: Got exception running Sqoop: java.lang.NullPointerException
java.lang.NullPointerException
  at org.json.JSONObject.<init>(JSONObject.java:144)
  at org.apache.sqoop.util.SqoopJsonUtil.getJsonStringforMap(SqoopJsonUtil.java:43)
  at org.apache.sqoop.SqoopOptions.writeProperties(SqoopOptions.java:785)
  at org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.createInternal(HsqldbJobStorage.java:399)
  at org.apache.sqoop.metastore.hsqldb.HsqldbJobStorage.create(HsqldbJobStorage.java:379)
  at org.apache.sqoop.tool.JobTool.createJob(JobTool.java:181)
  at org.apache.sqoop.tool.JobTool.run(JobTool.java:294)
  at org.apache.sqoop.Sqoop.run(Sqoop.java:147)
  at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
  at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:183)
  at org.apache.sqoop.Sqoop.runTool(Sqoop.java:234)
  at org.apache.sqoop.Sqoop.runTool(Sqoop.java:243)
  at org.apache.sqoop.Sqoop.main(Sqoop.java:252)

我们从http://www.java2s.com/Code/JarDownload/java-json/java-json.jar.zip下载这个依赖,然后放到$SQOOP_HOME/lib目录下。

cp ~/Downloads/java-json.jar $SQOOP_HOME/lib

当我们再次执行创建作业的时候又说作业已经存在。

attachments-2023-09-RJSNecOk65152381746fd,png

所以我们先删除一下,再创建。

# 删除作业
sqoop job --delete job_import_city
# 创建作业

sqoop job --create job_import_city \
-- import \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--table city \
--m 1

这次我们就创建成功了

attachments-2023-09-GFiEVAbY6515239998739,png

列出作业(–list)

--list参数可以列出创建的job

sqoop job --list

它显示保存的作业列表。

Available jobs
   job_import_city

检查作业(–show)

'–show’参数用于检查或验证特定作业及其细节

sqoop job --show job_import_city

它显示了job_import_city的详细信息,有一个输入密码的环节,由于我们没有设置秘密所以直接回车就好

2022-09-30 18:40:07,869 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
Enter password:
Job: job_import_city
Tool: import
Options:
----------------------------
verbose = false
hcatalog.drop.and.create.table = false
db.connect.string = jdbc:mysql://localhost/world
codegen.output.delimiters.escape = 0
codegen.output.delimiters.enclose.required = false
codegen.input.delimiters.field = 0
split.limit = null
hbase.create.table = false
mainframe.input.dataset.type = p
db.require.password = true
skip.dist.cache = false
hdfs.append.dir = false
db.table = city
codegen.input.delimiters.escape = 0
accumulo.create.table = false
import.fetch.size = null
codegen.input.delimiters.enclose.required = false
db.username = root
reset.onemapper = false
codegen.output.delimiters.record = 10
import.max.inline.lob.size = 16777216
sqoop.throwOnError = false
hbase.bulk.load.enabled = false
hcatalog.create.table = false
db.clear.staging.table = false
codegen.input.delimiters.record = 0
enable.compression = false
hive.overwrite.table = false
hive.import = false
codegen.input.delimiters.enclose = 0
accumulo.batch.size = 10240000
hive.drop.delims = false
customtool.options.jsonmap = {}
codegen.output.delimiters.enclose = 0
hdfs.delete-target.dir = false
codegen.output.dir = .
codegen.auto.compile.dir = true
relaxed.isolation = false
mapreduce.num.mappers = 1
accumulo.max.latency = 5000
import.direct.split.size = 0
sqlconnection.metadata.transaction.isolation.level = 2
codegen.output.delimiters.field = 44
export.new.update = UpdateOnly
incremental.mode = None
hdfs.file.format = TextFile
sqoop.oracle.escaping.disabled = true
codegen.compile.dir = /tmp/sqoop-liuwenqiang/compile/d7767a1386302f6e759aa443dfb7f8a4
direct.import = false
temporary.dirRoot = _sqoop
hive.fail.table.exists = false
db.batch = false

执行作业(–exec)

–exec选项用于执行保存的作业。以下命令用于执行名为job_import_city的作业。sqoop job --exec job_import_city然后报错了,信息如下,这主要是因为有一个输入密码的环节我直接回车了,不过从这个报错信息可以看出来是要我们输入数据源的密码。

attachments-2023-09-NgwAG86o651523b464e99,png

java.sql.SQLException: Access denied for user 'root'@'localhost' (using password: NO)
  at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3933)
  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3869)
  at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:864)
  at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.ja

这个时候你可以看--show的输出中并没有输出mysql 的密码信息,也就是说只保存了用户,所以当我们输入密码后我们的任务就执行成功了。

attachments-2023-09-Ic5H42W5651523ccc0ba8,png

既然又了作业(job),那我们就可以多次调用,不过由于我们没有添加--delete-target-dir 选项,多次调用会报错,因为目录已经存在。

attachments-2023-09-6G4WQf4i651523e368f8e,png

所以你创建job的时候需要将这个参数加上。

删除作业( --delete )

我们可以通过下面的命令删除作业

sqoop job --delete job_import_city

手动输入密码的问题

我们前面已经遇到手动输入密码的问题了,现在我们看下这个问题的解决方案,sqoop 提供了--password-file 选项来解决这个问题,就是我们创建一个密码文件,然后放在hdfs 上,然后通过--password-file来提供给hdfs。

# 创建密码文件
echo -n "www1234" > sqoop_world_db.pwd
# 创建hdfs目录
hdfs dfs -mkdir -p /sqoop/pwd
# 上传密码文件
hdfs dfs -put sqoop_world_db.pwd /sqoop/pwd

如下所示

attachments-2023-09-ud835O8d651523fa4a049,png

删除前面的job ,重新创建,并且指定--password-file。

# 删除作业
sqoop job --delete job_import_city
# 创建作业
sqoop job --create job_import_city \
-- import \
--connect jdbc:mysql://localhost/world \
--username root \
--table city \
--delete-target-dir \
--password-file /sqoop/pwd/sqoop_world_db.pwd \
--m 1
# 执行作业
sqoop job --exec job_import_city

这次我们发现任务成功执行。

增量作业

前面我们说了Sqoop 的作业主要是为了解决增量的问题,我们看看它是如何解决的last-value的维护问题的,下面我们创建一个作业,指定last-value是4000

sqoop job --create job_import_city_incremental \
-- import \
--connect jdbc:mysql://localhost/world \
--username root \
--table city \
--password-file /sqoop/pwd/sqoop_world_db.pwd \
--check-column id \
--last-value 4000 \
--incremental append \
--m 1

执行作业之前我们看一下当前的数据情况。

attachments-2023-09-n7Coes5065152410e58bc,png

执行作业

sqoop job --exec job_import_city_incremental

可以看到同步了79数据进来,这是正确的

2022-09-30 19:27:24,506 INFO mapreduce.Job: Counters: 31
  File System Counters
    FILE: Number of bytes read=0
    FILE: Number of bytes written=235899
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=87
    HDFS: Number of bytes written=2752
    HDFS: Number of read operations=6
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=2
    HDFS: Number of bytes read erasure-coded=0
  Job Counters
    Launched map tasks=1
    Other local map tasks=1
    Total time spent by all maps in occupied slots (ms)=1880
    Total time spent by all reduces in occupied slots (ms)=0
    Total time spent by all map tasks (ms)=1880
    Total vcore-milliseconds taken by all map tasks=1880
    Total megabyte-milliseconds taken by all map tasks=1925120
  Map-Reduce Framework
    Map input records=79
    Map output records=79
    Input split bytes=87
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=28
    CPU time spent (ms)=0
    Physical memory (bytes) snapshot=0
    Virtual memory (bytes) snapshot=0
    Total committed heap usage (bytes)=324009984
  File Input Format Counters
    Bytes Read=0
  File Output Format Counters
    Bytes Written=2752
2022-09-30 19:27:24,509 INFO mapreduce.ImportJobBase: Transferred 2.6875 KB in 11.7101 seconds (235.0113 bytes/sec)

我们看一下执行了一次之后的数据情况,多了一个数据文件。

attachments-2023-09-XkR3jww565152430c41d2,png

如果我们再次执行作业还能同步79条数据进来的话,那说明我们的增量作业没有生效,也就是没有帮我们维护last-value,因为我们目前没有新数据进来,我们发现执行后根本没有启动MR任务。

attachments-2023-09-3oyj4rfB65152445c113d,png

因为没有检测到有新的数据

2022-09-30 19:28:49,154 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`id`) FROM `city`
2022-09-30 19:28:49,155 INFO tool.ImportTool: Incremental import based on column `id`
2022-09-30 19:28:49,155 INFO tool.ImportTool: No new rows detected since last import.

我们看一下作业的详细信息sqoop job --show job_import_city_incremental我们发现last-value 是4079 ,也就是sqoop job 帮我们维护了这个值。

verbose = false
hcatalog.drop.and.create.table = false
incremental.last.value = 4079
db.connect.string = jdbc:mysql://localhost/world
codegen.output.delimiters.escape = 0
codegen.output.delimiters.enclose.required = false
codegen.input.delimiters.field = 0
mainframe.input.dataset.type = p
split.limit = null
hbase.create.table = false
skip.dist.cache = false
hdfs.append.dir = true
db.table = city

Eval

下面介绍如何使用Sqoop 的 Eval 工具。它允许用户针对数据库服务器执行用户定义的查询,并在控制台中预览结果。所以,用户可以导入自定义查询的数据。使用eval,我们可以执行任何类型的SQL语句,可以是DDL或DML语句。以下语法用于Sqoop eval命令。

sqoop eval(generic-args) (eval-args) 

查询语句

使用eval工具,我们可以评估任何类型的SQL查询。让我们举一个在db数据库的employee表中选择有限行的例子。以下命令用于评估使用SQL查询的给定示例。

sqoop eval \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 \
--query "SELECT * FROM city LIMIT 5"

如果该命令执行成功,则它将在终端上产生以下输出。

+------+--------------+-------------+-------------------+--------+
| Id   | Name         | Designation | Salary            | Dept   |
+------+--------------+-------------+-------------------+--------+
| 1201 | gopal        | manager     | 50000             | TP     |
| 1202 | manisha      | preader     | 50000             | TP     |
| 1203 | kalil        | php dev     | 30000             | AC     |
| 1204 | prasanth     | php dev     | 30000             | AC     |
| 1205 | kranthi      | admin       | 20000             | TP     |
| 1206 | satish p     | grp des     | 20000             | GR     |
+------+--------------+-------------+-------------------+--------+

插入语句

同理我们可以通过eval 执行插入,不过使用场景很少,上边的查询倒是经常用,不过主要是导出查询出的数据到HDFS的场景。

列出数据库和表

这个没什么好介绍的,几乎不用,我们知道有这么个东西即可。

列出数据库

sqoop list-databases \
--connect jdbc:mysql://localhost/ \
--username root \
--password www1234 

information_schema
azkaban
batch
bdp
data_home
dolphinscheduler
......

列出特定库下面的表

sqoop list-tables \
--connect jdbc:mysql://localhost/world \
--username root \
--password www1234 

结果如下

city
country
countrylanguage
  • 发表于 2023-09-28 14:39
  • 阅读 ( 33 )

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
石天
石天

437 篇文章

作家榜 »

  1. shitian 662 文章
  2. 石天 437 文章
  3. 每天惠23 33 文章
  4. 小A 29 文章