Nothing Special   »   [go: up one dir, main page]

OGG Flume Integration

Download as pdf or txt
Download as pdf or txt
You are on page 1of 12

Oracle GoldenGate Handler for Flume Example

Introduction
This how-to paper is written to illustrate the architecture, configuration and the implementation for
developing a custom adapter for streaming relational database transactions to Flume using the
Oracle GoldenGate Adapter. Please note, the code and the configuration of the Flume adapter
illustrated in this how-to is only meant to highlight the capability of the Oracle GoldenGate product.
As such, no support is provided by Oracle for the code and configuration illustrated in paper.
The code shown in this document is also contained in the accompanying archive
SampleHandlerFlume.zip. For simplicity the contents of the zip can be unpacked into the dirprm
subdirectory of the Oracle GoldenGate installation.

Hadoop Configuration
It is assumed that the reader has working knowledge of Hadoop technologies, if not,
hadoop.apache.com is a good source for learning about Hadoop. Also, it assumed that Hadoop and
Flume is installed and configured.

Oracle GoldenGate
Identify the type and version of OS platform (Linux, Windows, etc) and the type and version of
source relational data store (Oracle, MySQL etc.). Using the information about the OS and
database, download the appropriate Oracle GoldenGate binary. Refer to Oracle GoldenGate guides
for installing and configuring Oracle GoldenGate. If this is the first time using GoldenGate then it is
recommended to do a simple end to end replication between two relational databases to better grasp
the concepts
Download and install the Oracle GoldenGate Adapter. Please refer to Oracle GoldenGate Adapters
Administrators Guide for Java and get familiarized with the terminology and configuration of the
Java adapter.

Architecture of the Custom Java Adapter for Flume


Referring to the architecture diagram below, custom handler integration with Flume is developed
using Oracle GoldenGate's Java API and Apache Flumes Client API. The custom adapter is
deployed as an integral part of the Oracle GoldenGate Pump process. The Pump and the custom
adapter are configured through Pump parameter file and custom adapter's properties file. We will
discuss the adapter properties for integrating with Flume in the implementation section below.
The Pump process executes the adapter in its address space. The Pump reads the trail file created by
the Oracle GoldenGate Extract process and passes the transactions to the adapter. Based on the

configuration, the adapter will write the transactions in a desired format, with the appropriate
content into the file as defined by the Hive DDL for the table. Please refer to the Oracle
GoldenGate Developing Custom Java Adapter Handbook for details about the architecture and
developing a custom adapter.
Flume Agent
Capture

Trail
Files

Pump

Flume
Adapter

Source Channel

Sink

HDFS/
Hive

Source
Database
Pump
Parameter
file

Adapter
Properties
file

Flume
Config file

Implementation and Configuration


For the purpose of illustration in this paper, we used MySQL database version 5.1 as the relational
data store, Oracle Enterprise Linux 6.4, Cloudera CDH 5.1 and Oracle GoldenGate 11.2.1.0.1 for
Linux and MySQL.
The overall steps involved in replicating data from Oracle database to Flume are

Create a table in the MySQL database and prepare the table for replication.
Configure Flume for delivery to HDFS.
Configure the Oracle GoldenGate Capture to extract transactions from the MySQL
database and create the trail file.
Configure the property file for the Flume handler.
Code, Compile and package the custom Flume handler.
Configure the Oracle GoldenGate Pump to read the trail and invoke the custom
adapter.
(Optional) Create a table in Hive that corresponds to the table in Oracle database.
Execute a test.

Create a table in MySQL database.


Make sure the MySQL database is prepared for GoldenGate capture by enabling the binary log and
setting up a /tmp/mysql.sock file. Please consult the Oracle GoldenGate MySQL Installation and
Setup Guide for details.
In this example the table is created in a database called odidemo:
create table odidemo.MOVIE
(
MOVIE_ID NUMERIC NOT NULL,
TITLE VARCHAR(250),
YEAR NUMERIC,
BUDGET NUMERIC,
GROSS NUMERIC,
PLOT_SUMMARY VARCHAR(4000)
) ENGINE=InnoDB

Configure Flume for delivery to HDFS


This sample flume configuration listens to RPC calls on its source, uses a memory channel, and
delivers to the HDFS directory /user/hive/warehouse/${table}. The ${table} expression is a
Flume header that is populated by the custom Flume handler with the source table name in
lowercase characters. The HDFS sink is configured to roll the target file every 60 seconds.
This is the sample Flume configuration file flume.conf:
oggmovie.channels = memoryChannel
oggmovie.sources = rpcSource
oggmovie.sinks = hdfs-sink
oggmovie.channels.memoryChannel.type = memory
oggmovie.sources.rpcSource.channels = memoryChannel
oggmovie.sources.rpcSource.type = avro
oggmovie.sources.rpcSource.bind = 0.0.0.0
oggmovie.sources.rpcSource.port = 41414
oggmovie.sinks.hdfs-sink.type=hdfs
oggmovie.sinks.hdfs-sink.hdfs.path=hdfs://localhost/user/hive/warehouse/%{table}
oggmovie.sinks.hdfs-sink.hdfs.filePrefix=ogg_movie

oggmovie.sinks.hdfs-sink.hdfs.fileType=DataStream
oggmovie.sinks.hdfs-sink.hdfs.rollInterval=60
oggmovie.sinks.hdfs-sink.hdfs.fileSuffix=.txt
oggmovie.sinks.hdfs-sink.channel=memoryChannel
Please start the Flume agent using the following command:
> flume-ng agent --conf conf --conf-file flume.conf --name oggmovie -Dflume.root.logger=INFO,console

Please make sure the Flume agent is running when starting the GoldenGate Pump process later.
Configure Oracle GoldenGate Capture process
Sample Capture parameter file dirprm/emov.prm:
EXTRACT emov
DBOPTIONS HOST localhost, CONNECTIONPORT 3306
SOURCEDB odidemo, USERID root, PASSWORD welcome1
EXTTRAIL ./dirdat/tm
GETUPDATEBEFORES
TRANLOGOPTIONS ALTLOGDEST /var/lib/mysql/log/bigdatalite-bin.index
TABLE odidemo.*;
Add and start the Extract using the following command from the 'ggsci' command shell and ensure
it starts successfully.
ggsci>add extract emov, tranlog, begin now
ggsci>add exttrail ./dirdat/tm, extract emov, megabytes 50
ggsci>start extract emov
ggsci>info emov

Configure the Hive handler property file.


Create the parameter file dirprm/pmov.properties :
#Adapter Logging parameters.
gg.log=log4j
gg.log.level=info
#Adapter Check pointing parameters
goldengate.userexit.nochkpt=false
# Java User Exit Property
goldengate.userexit.writers=jvm
jvm.bootoptions= -Xms64m -Xmx512M -Dlog4j.configuration=log4j.properties Djava.class.path=dirprm:/u01/ogg/dirprm/myflumehandler.jar:/u01/ogg/ggjava/ggjava.jar:/usr/li
b/hadoop/client/hadoop-core.jar:/usr/lib/hadoop/client/hadoopcommon.jar:/usr/lib/hadoop/client/guava.jar:/usr/lib/hadoop/client/commons-configuration1.6.jar:/usr/lib/hadoop/client/hadoop-hdfs.jar:/usr/lib/hadoop/client/hadoopauth.jar:/etc/hadoop/conf:/usr/lib/hadoop/client/commonscli.jar:/usr/lib/hadoop/client/protobuf-java.jar:/usr/lib/hadoop/lib/netty3.6.2.Final.jar:/usr/lib/hadoop/lib/avro-1.7.5-cdh5.1.0.jar:/usr/lib/flume-ng/lib/avroipc.jar:/usr/lib/flume-ng/lib/jackson-mapper-asl-1.9.3.jar:/usr/lib/flume-ng/lib/jackson-core-asl1.9.3.jar:/usr/lib/flume-ng/lib/flume-ng-sdk-1.5.0-cdh5.1.0.jar
# Minimum number of {records, seconds} before generating a report
jvm.stats.time=3600
jvm.stats.numrecs=5000
jvm.stats.display=TRUE
jvm.stats.full=TRUE
#Hive Handler.
gg.handlerlist=flumehandler
gg.handler.flumehandler.type=com.oracle.demo.SampleHandlerFlume
# Use for delimiters other than the default \1 delimiter
# gg.handler.flumehandler.recordDelimiter=,
gg.handler.flumehandler.flumePort=41414
gg.handler.flumehandler.flumeHost=localhost

Flume Handler Code


The following table illustrates a sample code for the custom handler. The use case scenario is simple
Write the transaction into a Flume event using the Flume RpcClient API.
Create the code in a file named SampleHandlerFlume.java and place the file in a subdirectory tree
com/oracle/demo to match the Java package. Notice, the code illustrates the usage of custom
logging; additional logging may be added as required.
SampleHandlerFlume.java:
package com.oracle.demo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.goldengate.atg.datasource.*;
import com.goldengate.atg.datasource.adapt.*;
import com.goldengate.atg.datasource.meta.*;
import com.goldengate.atg.util.GGException;
import java.util.*;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;
import static com.goldengate.atg.datasource.GGDataSource.Status;
public class SampleHandlerFlume extends AbstractHandler {
final private static Logger logger =
LoggerFactory.getLogger(SampleHandlerFlume.class);
private static String TABLE_HEADER = "table";
private static String SCHEMA_HEADER = "schema";
private String recordDelimiter = "\1"; // Default Hive delimiter
// Flume
private String flumeHost = "localhost";
private int flumePort = 41414;
private RpcClient flumeClient;
public SampleHandlerFlume() {

super(TxOpMode.op); // define default mode for handler


}

public void init(DsConfiguration conf, DsMetaData metaData) {


logger.info("Initializing handler");
super.init(conf, metaData); // always call 'super'
// Setup the RPC connection
flumeClient =
RpcClientFactory.getDefaultInstance(flumeHost, flumePort);
}
public void insertFlume(String data, String table, String schema) {
try {
// Create a Flume Event object that encapsulates the sample data
Event event =
EventBuilder.withBody(data, Charset.forName("UTF-8"));
Map headers = event.getHeaders();
headers.put(SCHEMA_HEADER, schema);
headers.put(TABLE_HEADER, table);
event.setHeaders(headers);
flumeClient.append(event);
} catch (EventDeliveryException ex) {
logger.error("Caught Exception", ex);
throw new GGException("Exception in insertFlume", ex);
}
}

// Property setter methods.


public void setRecordDelimiter(String delim) {
logger.info("set record delimiter: " + delim);
this.recordDelimiter = delim;
}

public void setFlumeHost(String flumeHost) {

this.flumeHost = flumeHost;
}

public void setFlumePort(String flumePort) {


this.flumePort = Integer.parseInt(flumePort);
}

public Status operationAdded(DsEvent e, DsTransaction transaction,


DsOperation operation) {
logger.debug("operationAdded");
super.operationAdded(e, transaction, operation);
// Tx/Op/Col adapters wrap metadata & values behind a single, simple
// interface if using the DataSourceListener API (via AbstractHandler).
final TableMetaData tMeta =
getMetaData().getTableMetaData(operation.getTableName());
final Op op = new Op(operation, tMeta, getConfig());
processOp(op); // process data...
return Status.OK;
}

private void processOp(Op op) {


logger.debug("processOp: op={1}", op.getPosition());
DsOperation.OpType opType = op.getOperationType();
if (!opType.isInsert())
return;
StringBuffer strBuf = new StringBuffer("");
logger.debug(" ===> Processing operation: " + ", table='" +
op.getTableName() + "'" + ", pos=" + op.getPosition() +
", ts=" + op.getTimestamp());
TableName tname = op.getTableName();

TableMetaData tMeta = getMetaData().getTableMetaData(tname);


int i = 0;
boolean first = true;
for (Col c : op) {
if (first)
first = false;
else
strBuf.append(recordDelimiter);
logger.debug("Col: " + tMeta.getColumnName(i) + "=" + c + "," +
c.getValue());
strBuf.append(c.getValue());
i++;
}
String tableName = op.getTableName().getShortName().toLowerCase();
String schemaName = op.getTableName().getSchemaName();
insertFlume(strBuf.toString(), tableName, schemaName);
}

public String reportStatus() {


logger.debug("reportStatus");
String s = "Status report";
return s;
}
}

Compile and create the jar.


Ensure Java home and class path are properly set
Name

Command

JAVA_HOME

export set JAVA_HOME=<Java installation directory>

PATH

export PATH=$JAVA_HOME/bin:$PATH

LD_LIBRARY_PATH

export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:
$LD_LIBRARY_PATH

CLASSPATH

export CLASSPATH=/usr/lib/flumeng/lib/*:/usr/lib/hadoop/*:/usr/lib/hadoop/lib/*:[OGG installation


dir]/ggjava/ggjava.jar:/etc/hadoop/conf:.

Compile the code using the following command in the java source directory.
>javac com/oracle/demo/SampleHandlerFlume.java

Successfully executing the above command will result in creating SampleHiveHandler.class under
the subdirectory com/oracle/demo.
Create the jar using the following command in the java source directory
>jar cvf myflumehandler.jar
com

Add custom logging


Copy the file of desired type and level of logging from <Oracle GoldenGate
Directory>/ggjava/resources/classes to <Oracle GoldenGate Directory>/dirprm directory and
make the following changes. This example uses log4j and the desired level is trace hence the file
log4j-trace.properties should be used. Copy this file into the <Oracle GoldenGate
Directory>/dirprm directory and rename it mycompanylog4j.properties. Add the following lines to
enable custom logging.
log4j.logger.com.mycompany.bigdata=TRACE, stdout, rolling
log4j.additivity.com.mycompany.bigdata=false

Configure and Start Oracle GoldenGate Pump process


Sample Pump parameter file dirprm/pmov.prm:
extract pmov
SOURCEDB odidemo, USERID root, PASSWORD welcome1
sourcedefs ./dirdef/emov.def
CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES
TABLE odidemo.*;

Notes about the parameter file:


emov.def - The source definition file is used to get the metadata information about the tables by the
Java event handler framework. When using a source definition file it is not required to have source
database access. A source definition file is created by using the utility 'defgen' which is installed as
part of Oracle GoldenGate database replication product.
By default the Pump will use the properties file named pmov.properties in the 'dirprm' directory. In
the next section we will discuss the contents of this file.
Add and start the Pump using the following command from the 'ggsci' command shell and ensure it
starts successfully. Before starting the Pump process, ensure the flume agent is running and
the properties file and the handler jar which are discussed in the previous sections are ready.
ggsci> dblogin sourcedb odidemo userid root, password welcome1
ggsci> add extract pmov, exttrailsource ./dirdat/tm
ggsci>start pmov
ggsci>info pmov

Alternatively, instead of start pmov in GGSCI you can execute the process on the OS command
line. This is useful for debugging and has the advantage of seeing all output and error messages
directly in standard output.
> extract paramfile ./dirprm/pmov.prm

Create a table in Hive (optional)


From the Hive command prompt issue the following command and ensure that the table is created.
Hive> CREATE TABLE MOVIE(movie_id int, title string, year int, budget int, gross int,
plot_summary string);

Execute a test
Perform the following transaction on the database.
insert into MOVIE (MOVIE_ID,TITLE,YEAR,BUDGET,GROSS,PLOT_SUMMARY) values (1,

'Sharknado 2', 2014, 500000, 20000000, 'Flying sharks attack city');


commit;
Verify the record is replicated to Hive by issuing the following SQL on Hive.
hive> select * from movie;
OK
1
Sharknado 2 2014 500000 20000000
Time taken: 7.69 seconds, Fetched: 1 row(s)

Flying sharks attack city

You might also like