本篇内容主要讲解“如何使用ogg将Oracle数据传输到flume刷到kafka”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何使用ogg将Oracle数据传输到flume刷到kafka”吧!源端测试服务器:服务器环境部署:命令步骤如下:[root@test ~]# groupadd oinstall[root@test ~]# groupadd dba[root@test ~]# useradd -g oinstall -G dba oracle[root@test ~]#修改权限:[root@test ~]# chown -R oracle:oinstall /data[root@test ~]#2. 设置全局java环境变量[root@test ~]# cat /etc/redhat-releaseCentOS release 6.4 (Final)[root@test ~]#[oracle@test data]$ tar -zxvf jdk-8u60-linux-x64.tar.gz在root下执行配置:设置java环境变量:vi /etc/profile###jdkexport JAVA_HOME=/data/jdk1.8.0_60export JAVA_BIN=/data/jdk1.8.0_60/binexport PATH=$PATH:$JAVA_HOME/binexport CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport JAVA_HOME JAVA_BIN PATH CLASSPATHexport LD_LIBRARY_PATH=/data/jdk1.8.0_60/jre/lib/amd64/server:$LD_LIBRARY_PATH切换Oracle用户核对:[root@test ~]# su – oracle[oracle@test ~]$ java -versionjava version “1.8.0_60″Java(TM) SE Runtime Environment (build 1.8.0_60-b27)Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)[oracle@test ~]$如果不生效:修改java环境变量:alternatives –install /usr/bin/java java /data/jdk1.8.0_60/bin/java 100alternatives –install /usr/bin/jar jar /data/jdk1.8.0_60/bin/jar 100alternatives –install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100update-alternatives –install /usr/bin/javac javac /data/jdk1.8.0_60/bin/javac 100# /usr/sbin/alternatives –config java[root@test1 data]# /usr/sbin/alternatives –config javaThere are 4 programs which provide ‘java’. Selection Command———————————————– 1 /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java*+ 2 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java 3 /usr/lib/jvm/jre-1.5.0-gcj/bin/java 4 /data/jdk1.8.0_60/bin/javaEnter to keep the current selection[+], or type selection number: 4[root@test1 data]# /usr/sbin/alternatives –config javaThere are 4 programs which provide ‘java’. Selection Command———————————————– 1 /usr/lib/jvm/jre-1.6.0-openjdk.x86_64/bin/java* 2 /usr/lib/jvm/jre-1.7.0-openjdk.x86_64/bin/java 3 /usr/lib/jvm/jre-1.5.0-gcj/bin/java+ 4 /data/jdk1.8.0_60/bin/javaEnter to keep the current selection[+], or type selection number:[root@test1 data]#[root@test1 data]# java -versionjava version “1.8.0_60″Java(TM) SE Runtime Environment (build 1.8.0_60-b27)Java HotSpot(TM) 64-Bit Server VM (build 25.60-b23, mixed mode)[root@test1 data]#修改flume 参数配置:[oracle@test1 conf]$ cat flume-conf.properties# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# “License”); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.# The configuration file needs to define the sources,# the channels and the sinks.# Sources, channels and sinks are defined per agent,# in this case called ‘agent’agent.sources = r1agent.channels = fileChannelagent.sinks = kafkaSink# For each one of the sources, the type is definedagent.sources.seqGenSrc.type = seq# The channel can be defined as follows.agent.sources.seqGenSrc.channels = fileChannel#agent.sources.r1.type = avroagent.sources.r1.port = 14141agent.sources.r1.bind = 192.168.88.66agent.sources.r1.channels = fileChannel# Each sink’s type must be definedagent.sinks.loggerSink.type = logger#Specify the channel the sink should useagent.sinks.loggerSink.channel = memoryChannel#kafka sinkagent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSinkagent.sinks.kafkaSink.topic = my_schemaagent.sinks.kafkaSink.brokerList = 192.168.88.1:9092,192.168.88.2:9092,192.168.88.3:9092,192.168.88.4:9092agent.sinks.kafkaSink.requiredAcks = 1agent.sinks.kafkaSink.batchSize = 20agent.sinks.kafkaSink.channel = fileChannel# Each channel’s type is defined.agent.channels.memoryChannel.type = memory# Other config values specific to each type of channel(sink or source)# can be defined as well# In this case, it specifies the capacity of the memory channelagent.channels.memoryChannel.capacity = 100#File Channelagent.channels.fileChannel.type = fileagent.channels.fileChannel.transactionCapacity = 20000000agent.channels.fileChannel.capacity = 50000000agent.channels.fileChannel.maxFileSize = 2147483648agent.channels.fileChannel.minimumRequiredSpace = 52428800agent.channels.fileChannel.keep-alive = 3agent.channels.fileChannel.checkpointInterval = 20000agent.channels.fileChannel.checkpointDir = /data/apache-flume-1.6.0-bin/CheckpointDiragent.channels.fileChannel.dataDirs = /data/apache-flume-1.6.0-bin/DataDir[oracle@test1 conf]$############配置OGG主库在源库创建新的抽取进程:dblogin userid goldengate, password goldengateadd extract EXTJMS,tranlog, threads 2,begin nowadd exttrail /data/goldengate/dirdat/kf, extract EXTJMS megabytes 200add schematrandata my_schemaadd trandata my_schema.*原抽取进程:extract EXTJMSsetenv (ORACLE_SID=”testdb”)setenv (NLS_LANG=”AMERICAN_AMERICA.AL32UTF8″)userid goldengate, password goldengateTRANLOGOPTIONS DBLOGREADERexttrail /data/goldengate/dirdat/kfdiscardfile /data/goldengate/dirrpt/EXTJMS.dsc,appendTHREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000numfiles 3000CHECKPOINTSECS 20DISCARDROLLOVER AT 05:30dynamicresolutionGETUPDATEBEFORESNOCOMPRESSUPDATESNOCOMPRESSDELETESRecoveryOptions OverwriteModeddl &include mapped &exclude objtype ‘TRIGGER’ &exclude objtype ‘PROCEDURE’ &exclude objtype ‘FUNCTION’ &exclude objtype ‘PACKAGE’ &exclude objtype ‘PACKAGE BODY’ &exclude objtype ‘TYPE’ &exclude objtype ‘GRANT’ &exclude instr ‘GRANT’ &exclude objtype ‘DATABASE LINK’ &exclude objtype ‘CONSTRAINT’ &exclude objtype ‘JOB’ &exclude instr ‘ALTER SESSION’ &exclude INSTR ‘AS SELECT’ &exclude INSTR ‘REPLACE SYNONYM’ &EXCLUDE OBJNAME “my_schema.DBMS_TABCOMP_TEMP_CMP” &EXCLUDE OBJNAME “my_schema.DBMS_TABCOMP_TEMP_UNCMP”FETCHOPTIONS NOUSESNAPSHOT, USELATESTVERSION, MISSINGROW REPORTTABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;–extract table userTABLE my_schema.*;SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;Database altered.SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS;Database altered.SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database;SUPPLEME SUP SUP FOR——– — — —YES YES YES YESSQL>源端添加新的pump进程:在my_schema源库测试添加pump进程:添加pump进程:添加新的pump:add extract EDPKF,exttrailsource /data/goldengate/dirdat/kf, begin nowedit param EDPKFEXTRACT EDPKFsetenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)PASSTHRUGETUPDATEBEFORESNOCOMPRESSUPDATESNOCOMPRESSDELETESRecoveryOptions OverwriteModeRMTHOST 192.168.88.66, MGRPORT 7839RMTTRAIL /data/ogg_for_bigdata/dirdat/kpDISCARDFILE ./dirrpt/EDPKF.dsc,APPEND,MEGABYTES 5TABLE my_schema.* ;add rmttrail /data/ogg_for_bigdata/dirdat/kp, extract EDPKF megabytes 200edit param defgenuserid goldengate, password goldengatedefsfile dirdef/my_schema.defTABLE my_schema.*;传递定义文件:./defgen paramfile ./dirprm/defgen.prm目标端直接端mgr:PORT 7839DYNAMICPORTLIST 7840-7850–AUTOSTART replicat *–AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2LAGREPORTHOURS 1LAGINFOMINUTES 30LAGCRITICALMINUTES 45添加 UE DATA PUMP:使用版本:Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209ADD EXTRACT LOANFLM, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kpedit param JMSFLMGGSCI (localhost.localdomain) 18> view param JMSFLMEXTRACT JMSFLMSETENV (GGS_USEREXIT_CONF =”dirprm/JMSFLM.props”)GetEnv (JAVA_HOME)GetEnv (PATH)GetEnv (LD_LIBRARY_PATH)SourceDefs dirdef/my_schema.defCUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBeforesGetUpdateBeforesNoCompressDeletesNoCompressUpdatesNoTcpSourceTimerTABLEEXCLUDE my_schema.MV*;TABLE my_schema.*;–alter prodjms extseqno 736, extrba 0注释:在目标端完全可以不安装Oracle数据库,可以和flume环境放在一起,最终刷数据到kafka的服务器接收消息。本案例是通过flume中转实现的,完全没有问题。当然也可以直接将数据传输到kafka处理消息,原理都是一样的。未来更多的大数据融合也是一个不错的方案,无论是mysql,mongodb,hdfs等都可以完美结合。参数文件:$ cat JMSFLM.propsgg.handlerlist=flumehandlergg.handler.flumehandler.type=com.goldengate.delivery.handler.flume.FlumeHandlergg.handler.flumehandler.host=192.168.88.66gg.handler.flumehandler.port=14141gg.handler.flumehandler.rpcType=avrogg.handler.flumehandler.delimiter=u0001gg.handler.flumehandler.mode=opgg.handler.flumehandler.includeOpType=true# Indicates if the operation timestamp should be included as part of output in the delimited separated values# true – Operation timestamp will be included in the output# false – Operation timestamp will not be included in the output# Default :- true#gg.handler.flumehandler.includeOpTimestamp=true#gg.handler.name.deleteOpKey=D#gg.handler.name.updateOpKey=U#gg.handler.name.insertOpKey=I#gg.handler.name.pKUpdateOpKey=P#gg.handler.name.includeOpType=true# Optional properties to use the transaction grouping functionality#gg.handler.flumehandler.maxGroupSize=1000#gg.handler.flumehandler.minGroupSize=1000### native library config ###goldengate.userexit.nochkpt=TRUEgoldengate.userexit.timestamp=utcgoldengate.log.logname=cuserexitgoldengate.log.level=DEBUGgoldengate.log.tofile=truegoldengate.userexit.writers=javawritergoldengate.log.level.JAVAUSEREXIT=DEBUG#gg.br免费云主机域名okentrail=truegg.report.time=30secgg.classpath=/data/ogg_for_bigdata/dirprm/flumejar/*:/data/apache-flume-1.6.0-bin/lib/*javawriter.stats.full=TRUEjavawriter.stats.display=TRUEjavawriter.bootoptions=-Xmx81920m -Xms20480m -Djava.class.path=/data/ogg_for_bigdata/ggjava/ggjava.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties到此,相信大家对“如何使用ogg将Oracle数据传输到flume刷到kafka”有了更深的了解,不妨来实际操作一番吧!这里是云编程开发博客网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
1.业务接口类及其实现2.RmiServiceExporter(服务端免费云主机域名)3.RmiProxyFactoryBean(客户端)4.测试注:Registry服务的注册问题,有时会出现服务启动报错.相关推荐: linux中的mysql有10061错误怎…
免责声明:本站发布的图片视频文字,以转载和分享为主,文章观点不代表本站立场,本站不承担相关法律责任;如果涉及侵权请联系邮箱:360163164@qq.com举报,并提供相关证据,经查实将立刻删除涉嫌侵权内容。