Skip to content

Latest commit

 

History

History
598 lines (472 loc) · 20.4 KB

3-数据集成.md

File metadata and controls

598 lines (472 loc) · 20.4 KB

[TOC]

数据集成(采集)

1.Oracle_To_HDFS

将数据从Oracle抽取到HDFS采用的是Sqoop,简单方便,专门为RDBMS和HDFS之间的数据集成而生

常用命令

sqoop import | export \
--数据库连接参数
--HDFS或者Hive的连接参数
--配置参数

参数

数据库参数:
--connect jdbc:mysql://hostname:3306
--username
--password
--table
--columns
--where
-e/--query
导入参数:
--delete-target-dir
--target-dir
--hcatalog-database
--hcatalog-table
导出参数:
--export-dir
--hcatalog-database
--hcatalog-table
其他参数:
-m

sqoop采集命令

sqoop import \
-Dmapreduce.job.user.classpath.first=true \
--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \
--username ciss \
--password 123456 \
--table CISS4.CISS_SERVICE_WORKORDER \
--delete-target-dir \
--target-dir /test/full_imp/ciss4.ciss_service_workorder \
--as-avrodatafile \
--fields-terminated-by "\001" \
-m 1

在测试增量全量采集时,运行脚本必须在Sqoop容器里进行

1.1 增量采集

sqoop增量采集一般有三种方式

1.1.1 Append

要求:必须有一列自增的值,按照自增的int值进行判断

特点:只能导入增加的数据,无法导入更新的数据

场景:数据只会发生新增,不会发生更新的场景

sqoop import \
--connect jdbc:mysql://OneMake:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--target-dir /sqoop/import/test02 \
--fields-terminated-by '\t' \
--check-column id \
--incremental append \
--last-value 0 \
-m 1

1.1.2 Lastmodified

要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断

特点:既导入新增的数据也导入更新的数据

场景:一般无法满足要求,所以不用

sqoop import \
--connect jdbc:mysql://OneMake:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_lastmode \
--target-dir /sqoop/import/test03 \
--fields-terminated-by '\t' \
--incremental lastmodified \
--check-column lastmode \
--last-value '2021-06-06 16:09:32' \
-m 1

1.1.3 Customization

要求:每次运行的输出目录不能相同

特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集

场景:一般用于自定义增量采集每天的分区数据到Hive

sqoop import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where
substring(create_time,1,10) = '2021-09-14' or
substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-09-14 \
--fields-terminated-by '\t' \
-m 1

上面三种方案。综合起来,还是第三种方案满足需求,既可以新增又可以更新

1.1.4 脚本

Shell

通过shell脚本实现自动化将Oracle中的增量表的数据集成到HDFS对应的路径

参数说明在后面全量采集脚本里有说明

ShellLinux原生Shell脚本,命令功能全面丰富,主要用于实现Linux自动化,适合于Linux中简单的自动化任务开发
Python:多平台可移植兼容脚本,自身库功能强大,主要用于爬虫、数据科学分析计算等,适合于复杂逻辑的处理计算场景
场景:一般100行以内的代码建议用Shell,超过100行的代码建议用Python
采集脚本选用:Shell
#!/usr/bin/env bash
# 编写SHELL脚本的时候要特别小心,特别是编写SQL的条件,如果中间加了空格,就会导致命令执行失败
# /bin/bash
biz_date=20210101
biz_fmt_date=2021-01-01
dw_parent_dir=/data/dw/ods/one_make/incr_imp
workhome=/opt/sqoop/one_make
incr_imp_tables=${workhome}/incr_import_tables.txt

orcl_srv=oracle.bigdata.cn
orcl_port=1521
orcl_sid=helowin
orcl_user=ciss
orcl_pwd=123456

mkdir ${workhome}/log

sqoop_condition_params="--where \"'${biz_fmt_date}'=to_char(CREATE_TIME,'yyyy-mm-dd')\""
sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"

# load hadoop/sqoop env
source /etc/profile

while read p; do
    # clean old directory in HDFS
    hdfs dfs -rm -r ${dw_parent_dir}/${p}/${biz_date}
    
    # parallel execution import
    ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} ${sqoop_condition_params} -m 1 &
    cur_time=`date "+%F %T"`
    echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} ${sqoop_condition_params} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_incr_imp.log
    sleep 30
    
done < ${incr_imp_tables}
Python
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Author: Alex_liu
# Program function: 将Oracle中的增量表数据抽取到HDFS对应的路径上
import os
import subprocess
import datetime
import time
import logging

biz_date = '20210101'
biz_fmt_date = '2021-01-01'
dw_parent_dir = '/data/dw/ods/one_make/incr_imp'
workhome = '/opt/sqoop/one_make'
incr_imp_tables = workhome + '/incr_import_tables.txt'
if os.path.exists(workhome + '/log'):
    os.system('make ' + workhome + '/log')

orcl_srv = 'oracle.bigdata.cn'
orcl_port = '1521'
orcl_sid = 'helowin'
orcl_user = 'ciss'
orcl_pwd = '123456'

sqoop_import_params = 'sqoop import -Dmapreduce.job.user.classpath.first=true --outdir %s/java_code --as-avrodatafile' % workhome
sqoop_jdbc_params = '--connect jdbc:oracle:thin:@%s:%s:%s --username %s --password %s' % (orcl_srv, orcl_port, orcl_sid, orcl_user, orcl_pwd)

# load hadoop/sqoop env
subprocess.call("source /etc/profile", shell=True)
print('executing...')
# read file
fr = open(incr_imp_tables)
for line in fr.readlines():
    tblName = line.rstrip('\n')
    # clean old directory in HDFS
    hdfs_command = 'hdfs dfs -rm -r %s/%s/%s' % (dw_parent_dir, tblName, biz_date)
    # parallel execution import
    # ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 &
    # sqoopImportCommand = f''' {sqoop_import_params} {sqoop_jdbc_params} --target-dir {dw_parent_dir}/{tblName}/{biz_date} --table {tblName.upper()} -m 1 &'''
    sqoopImportCommand = '''
    %s %s --target-dir %s/%s/%s --table %s -m 1 &
    ''' % (sqoop_import_params, sqoop_jdbc_params, dw_parent_dir, tblName, biz_date, tblName.upper())
    # parallel execution import
    subprocess.call(sqoopImportCommand, shell=True)
    # cur_time=`date "+%F %T"`
    # cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    logging.basicConfig(level=logging.INFO,
                        filename='%s/log/%s_full_imp.log' % (workhome, biz_fmt_date),
                        filemode='a',
                        format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
    # logging.info(cur_time + ' : ' + sqoopImportCommand)
    logging.info(sqoopImportCommand)
    time.sleep(15)

测试

进入Sqoop容器里执行,脚本已经上传到对应的/opt/sqoop/one_make路径下

/opt/sqoop/one_make/incr_import_tables.sh
或
/opt/sqoop/one_make/incr_import_tables.py

python脚本服务器执行时,删除中文注释

在windows下编写多行文件,上传到linux上执行时,会有换行符号文件,如果遇到此类报错,使用命令替换:sed -i 's/\r//g' full_import_tables.txt

Snipaste_2023-01-03_22-53-43

刚开始的报红是正常的

Snipaste_2023-01-03_22-57-48

可以看到某个MR任务的一些参数,有导入条数:2668、写到HDFS的总字节数:371060Byte、任务花费的时间等等

也可以在Yarn的WebUI(http://onemake:8088)看到所有的Sqoop任务,点击某个任务的History按钮,会去到Yarn历史任务(http://onemake:19888/)界面

Snipaste_2023-01-03_23-06-50

可以看到MR任务的基本信息,然后点击左边的Counters按钮,会看到MR任务的细节

Snipaste_2023-01-03_23-14-10

一些MR任务的详细的细节就在界面上展示着

Snipaste_2023-01-03_23-06-10

增量表57张,总共57个MR任务,等到全部完成以后去HDFS上对应的路径下查看是否有对应的数据文件

Snipaste_2023-01-05_18-33-56

并且目录是按日期作为分区,用于后面建立分区表映射数据

Snipaste_2023-01-05_18-34-09

1.2 全量采集

脚本

Shell

通过shell脚本实现自动化将Oracle中的增量表的数据集成到HDFS对应的路径

#!/usr/bin/env bash
# /bin/bash
#1.biz_date:目录日期格式(yyyyMMdd)
#2.biz_fmt_date:日期日志格式(yyyy-MM-dd)
#3.dw_parent_dir:全量导入父目录
#4.workhome:sqoop的项目home目录
#5.full_imp_tables:全量表文件全路径
#6.基于sqoop项目home目录
biz_date=20210101
biz_fmt_date=2021-01-01
dw_parent_dir=/data/dw/ods/one_make/full_imp
workhome=/opt/sqoop/one_make
full_imp_tables=${workhome}/full_import_tables.txt
mkdir ${workhome}/log

#orcl_srv:oracle连接地址(host或ip)
#2.orcl_port:端口
#3.orcl_sid:service名称
#4.orcl_user:用户名
#5.orcl_pwd:密码
orcl_srv=oracle.bigdata.cn
orcl_port=1521
orcl_sid=helowin
orcl_user=ciss
orcl_pwd=123456

#定义sqoop导入命令:
#1.sqoop_import_params:定义sqoop导入参数变量(包含:job参数、代码输出路径)
#2.sqoop_jdbc_params:定义sqoop导入oracle参数变量(包含:连接、用户名、密码)
#执行sqoop导入之前,加载hadoop、sqoop环境变量
#根据表名,循环执行sqoop导入:
#  	1.后台执行sqoop导入命令
#  	2.定义获得当前时间的变量:cur_time
#  	3.记录时间和命令日志,并写入到日志目录下
#  		1.规则:时间(yyyy-MM-dd HH:mm:ss):sqoop命令,最终追加到日志文件中
#  		2.为避免oracle崩溃,执行一次sqoop导入命令,睡眠15秒

sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"

# load hadoop/sqoop env
source /etc/profile

while read p; do
    # parallel execution import
    ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} -m 1 &
    cur_time=`date "+%F %T"`
    echo "${cur_time}: ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p} -m 1 &" >> ${workhome}/log/${biz_fmt_date}_full_imp.log
#如果执行过程中发现Oracle崩溃,可以让每个sqoop命令执行后sleep一段时间
    sleep 15
done < ${full_imp_tables}
Python
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Author: Alex_liu
# Program function: 将Oracle中的全量表数据抽取到HDFS对应的路径上
  import os
  import subprocess
  import datetime
  import time
  import logging

  biz_date = '20210101'
  biz_fmt_date = '2021-01-01'
  dw_parent_dir = '/data/dw/ods/one_make/full_imp'
  workhome = '/opt/sqoop/one_make'
  full_imp_tables = workhome + '/full_import_tables.txt'
  if os.path.exists(workhome + '/log'):
      os.system('make ' + workhome + '/log')

  orcl_srv = 'oracle.bigdata.cn'
  orcl_port = '1521'
  orcl_sid = 'helowin'
  orcl_user = 'ciss'
  orcl_pwd = '123456'

  sqoop_import_params = 'sqoop import -Dmapreduce.job.user.classpath.first=true --outdir %s/java_code --as-avrodatafile' % workhome
  sqoop_jdbc_params = '--connect jdbc:oracle:thin:@%s:%s:%s --username %s --password %s' % (orcl_srv, orcl_port, orcl_sid, orcl_user, orcl_pwd)

load hadoop/sqoop env

  subprocess.call("source /etc/profile", shell=True)
  print('executing...')

read file

  fr = open(full_imp_tables)
  for line in fr.readlines():
      tblName = line.rstrip('\n')
      # parallel execution import
      sqoopImportCommand = '''
      %s %s --target-dir %s/%s/%s --table %s -m 1 &
      ''' % (sqoop_import_params, sqoop_jdbc_params, dw_parent_dir, tblName, biz_date, tblName.upper())
      # parallel execution import
      subprocess.call(sqoopImportCommand, shell=True)
      # cur_time=date "+%F %T"
      # cur_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
      logging.basicConfig(level=logging.INFO,  # 控制台打印的日志级别
                          filename='%s/log/%s_full_imp.log' % (workhome, biz_fmt_date),
                          # 模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志; a是追加模式,默认如果不写的话,就是追加模式
                          filemode='a',
                          # 日志格式
                          format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
      # logging.info(cur_time + ' : ' + sqoopImportCommand)
      logging.info(sqoopImportCommand)
      time.sleep(15)

测试

/opt/sqoop/one_make/full_import_tables.sh 
或
/opt/sqoop/one_make/full_import_tables.py

sqoop日志

Snipaste_2023-01-05_18-29-50

Yarn界面

Snipaste_2023-01-05_18-30-02

HDFS界面

Snipaste_2023-01-05_18-24-50

并且目录是按日期作为分区,用于后面建立分区表映射数据

Snipaste_2023-01-05_18-25-37

1.3 Schema备份及上传

在前面导入Oracle数据到HDFS的测试中,因为数据格式问题采用了AVRO的格式,细节可以看项目测试记录中的文档

编写脚本(python或shell)自动上传avsc文件到hdfs目录做备份归档

#!/usr/bin/env bash
# 上传
# /bin/bash
#1.workhome:项目的工作目录 /opt/sqoop/one_make
#2.hdfs_schema_dir:avro文件存放目录 /CISS_Input_Data/Ingest/one_make/avsc
#3.biz_date:压缩文件日期 20210101
#4.biz_fmt_date:日志日期 2021-01-01
#5.local_schema_backup_filename:本地备份文件名 schema_${biz_date}.tar.gz
#6.hdfs_schema_backup_filename:hdfs备份文件名 avro_schema_${biz_date}.tar.gz
#7.log_file:日志文件全路径名 workhome/log/upload_avro_schema_${biz_fmt_date}.log
workhome=/opt/sqoop/one_make
hdfs_schema_dir=/data/dw/ods/one_make/avsc
biz_date=20210101
biz_fmt_date=2021-01-01
local_schema_backup_filename=schema_${biz_date}.tar.gz
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
log_file=${workhome}/log/upload_avro_schema_${biz_fmt_date}.log

# 打印日志
#cur_time:当前时间
#${cur_time} $*:执行日志,打印时间+日志打印方法后的字符串追加日志到log文件中
log() {
    cur_time=`date "+%F %T"`
    echo "${cur_time} $*" >> ${log_file}
}
#加载环境变量文件,并进入项目的工作目录
source /etc/profile
cd ${workhome}

#  hadoop fs [generic options] [-test -[defsz] <path>]
# -test -[defsz] <path> :
#   Answer various questions about <path>, with result via exit status.
#     -d  return 0 if <path> is a directory.
#     -e  return 0 if <path> exists.
#     -f  return 0 if <path> is a file.
#     -s  return 0 if file <path> is greater than zero bytes in size.
#     -z  return 0 if file <path> is zero bytes in size, else return 1.


log "Check if the HDFS Avro schema directory ${hdfs_schema_dir}..."
hdfs dfs -test -e ${hdfs_schema_dir} > /dev/null

#判断目录是否存在,不存在就创建avro目录
if [ $? != 0 ]; then
    log "Path: ${hdfs_schema_dir} is not exists. Create a new one."
    log "hdfs dfs -mkdir -p ${hdfs_schema_dir}"
    hdfs dfs -mkdir -p ${hdfs_schema_dir}
fi

log "Check if the file ${hdfs_schema_dir}/CISS4_CISS_BASE_AREAS.avsc has uploaded to the HFDS..."

hdfs dfs -test -e ${hdfs_schema_dir}/CISS4_CISS_BASE_AREAS.avsc.avsc > /dev/null

#检查hdfs上的avro目录下表的avro文件是否存在(hdfs上不存在就上传全部的avro文件)
if [ $? != 0 ]; then
    log "Upload all the .avsc schema file."
    log "hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}"
    hdfs dfs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
fi
log "Check if the backup tar.gz file has generated in the local server..." 

# backup 检查linux本地文件备份是否存在(本地不存在,则创建avro的压缩文件)
if [ ! -e ${local_schema_backup_filename} ]; then
    log "package and compress the schema files"
    log "tar -czf ${local_schema_backup_filename} ./java_code/*.avsc"
    tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
fi
log "Check if the backup tar.gz file has upload to the HDFS..."

#备份avro压缩文件上传到hdfs
hdfs dfs -test -e ${hdfs_schema_backup_filename} > /dev/null
if [ $? != 0 ]; then
    log "upload the schema package file to HDFS"
    log "hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}"
    hdfs dfs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
fi
#!/usr/bin/env python
# _*_ coding: utf-8 _*_
# Author: Alex_liu
# Program function: avsc文件到hdfs目录做备份归档

# import pyhdfs
import logging
import os

workhome = '/opt/sqoop/one_make'
hdfs_schema_dir = '/data/dw/ods/one_make/avsc'
biz_date = '20210101'
biz_fmt_date = '2021-01-01'
local_schema_backup_filename = 'schema_%s.tar.gz' % biz_date
hdfs_schema_backup_filename = '%s/avro_schema_%s.tar.gz' % (hdfs_schema_dir, biz_date)
log_file = '%s/log/upload_avro_schema_%s.log' % (workhome, biz_fmt_date)

# append log to file
logging.basicConfig(level=logging.INFO,
                    filename=log_file,
                    filemode='a',
                    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')

os.system('source /etc/profile')
os.system('cd %s' % workhome)

#  hadoop fs [generic options] [-test -[defsz] <path>]
# -test -[defsz] <path> :
#   Answer various questions about <path>, with result via exit status.
#     -d  return 0 if <path> is a directory.
#     -e  return 0 if <path> exists.
#     -f  return 0 if <path> is a file.
#     -s  return 0 if file <path> is greater than zero bytes in size.
#     -z  return 0 if file <path> is zero bytes in size, else return 1.
logging.info('Check if the HDFS Avro schema directory %s...', hdfs_schema_dir)
# hdfs = pyhdfs.HdfsClient(hosts="node1,9000", user_name="hdfs")
# print(hdfs.listdir('/'))
# hdfs dfs -test -e ${hdfs_schema_dir} > /dev/null
commStatus = os.system('hdfs dfs -test -e %s > /dev/null' % hdfs_schema_dir)
if commStatus is not 0:
    logging.info('Path: %s is not exists. Create a new one.', hdfs_schema_dir)
    logging.info('hdfs dfs -mkdir -p %s', hdfs_schema_dir)
    os.system('hdfs dfs -mkdir -p %s' % hdfs_schema_dir)

logging.info('Check if the file %s/CISS4_CISS_BASE_AREAS.avsc has uploaded to the HFDS...', hdfs_schema_dir)
commStatus = os.system('hdfs dfs -test -e %s/CISS4_CISS_BASE_AREAS.avsc > /dev/null' % hdfs_schema_dir)
if commStatus is not 0:
    logging.info('Upload all the .avsc schema file.')
    logging.info('hdfs dfs -put %s/java_code/*.avsc %s', workhome, hdfs_schema_dir)
    os.system('hdfs dfs -put %s/java_code/*.avsc %s' % (workhome, hdfs_schema_dir))

# backup
logging.info('Check if the backup tar.gz file has generated in the local server...')
commStatus = os.system('[ -e %s ]' % local_schema_backup_filename)
if commStatus is not 0:
    logging.info('package and compress the schema files')
    logging.info('tar -czf %s ./java_code/*.avsc', local_schema_backup_filename)
    os.system('tar -czf %s ./java_code/*.avsc' % local_schema_backup_filename)

logging.info('Check if the backup tar.gz file has upload to the HDFS...')
commStatus = os.system('hdfs dfs -test -e %s > /dev/null' % hdfs_schema_backup_filename)
if commStatus is not 0:
    logging.info('upload the schema package file to HDFS')
    logging.info('hdfs dfs -put %s %s', local_schema_backup_filename, hdfs_schema_backup_filename)
    os.system('hdfs dfs -put %s %s' %(local_schema_backup_filename, hdfs_schema_backup_filename))

测试

/opt/sqoop/one_make/upload_avro_schema.sh
或
/opt/sqoop/one_make/upload_avro_schema.py

Schema文件已经上传到HDFS上

Snipaste_2023-01-05_19-29-03

Snipaste_2023-01-05_19-27-19