Table of Contents

Search

  1. Preface
  2. Introduction to Informatica Connector Toolkit
  3. Before you begin
  4. Develop a connector for Cloud Data Integration
  5. Develop a connector for Data Loader
  6. Import a connector
  7. Connection attributes
  8. Type system
  9. Connector metadata
  10. Partitioning capability
  11. Pushdown capability
  12. Mappings in advanced mode
  13. Manual changes to Informatica Connector Toolkit source code
  14. Runtime behavior
  15. Connector example: MySQL_Cloud
  16. Version control integration
  17. Appendix A: Metadata models
  18. Appendix B: ASO model
  19. Appendix C: Connector project migration
  20. Appendix D: Frequently used generic APIs in Informatica Connector Toolkit
  21. Appendix E: Frequently asked questions

Cloud Data Integration Connector Toolkit Developer Guide

Cloud Data Integration Connector Toolkit Developer Guide

Project, classes, and methods

Project, classes, and methods

After you define the connector metadata for pushdown capability and generate the code, the Informatica Connector Toolkit creates the runtime.spark project with DataFrameAdapter.scala class.
When you enable the read and write capabilities for mappings that run on the advanced cluster, you must implement the read and write methods in the Informatica Connector Toolkit scala class DataFrameAdapter.scala.
The following code snippet shows an example of the Scala class with read and write methods for MySQL_Cloud Connector:
package com.infa.adapter.msql.runtime.scalatask import java.util.Properties import com.informatica.sdk.adapter.metadata.projection.semantic.iface.Operation import com.informatica.sdk.adapter.metadata.projection.sinkoperation.semantic.iface.NativeSink import com.informatica.sdk.adapter.metadata.projection.sinkoperation.semantic.iface.PlatformSink import com.informatica.sdk.scalatask.dataframe.AdapterContext import com.informatica.sdk.scalatask.dataframe.DataFrameAdapter import com.informatica.sdk.adapter.metadata.capabilityattribute.semantic.iface.ASOComplexType import com.informatica.sdk.adapter.metadata.capabilityattribute.semantic.iface.ReadCapabilityAttributes import com.informatica.sdk.adapter.metadata.capabilityattribute.semantic.iface.WriteCapabilityAttributes import com.informatica.sdk.adapter.metadata.common.datasourceoperation.semantic.iface.Capability import com.informatica.sdk.adapter.metadata.common.datasourceoperation.semantic.iface.ReadCapability import com.informatica.sdk.adapter.metadata.patternblocks.field.semantic.iface.Field import com.informatica.sdk.adapter.metadata.patternblocks.flatrecord.semantic.iface.FlatRecord import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.BinaryOperatorEnum import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.Constant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.DateConstant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.DecimalConstant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.FieldIdentifier import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.IntegerConstant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.StringConstant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.SDKExpression import com.informatica.sdk.adapter.metadata.projection.filteroperation.semantic.iface.FilterOperation import com.informatica.sdk.adapter.metadata.projection.joinoperation.semantic.iface.JoinOperation import com.informatica.sdk.adapter.metadata.projection.projectionoperation.semantic.iface.ProjectionOperation import com.informatica.sdk.adapter.metadata.projection.projectionoperation.semantic.iface.SelectTypeEnum import com.informatica.sdk.adapter.metadata.projection.semantic.iface.FieldBase import com.informatica.sdk.adapter.metadata.projection.semantic.iface.NativeField import com.informatica.sdk.adapter.metadata.projection.semantic.iface.OperationBase import com.informatica.sdk.adapter.metadata.projection.simpleexpression.semantic.iface.SimpleBinaryExpression import com.informatica.sdk.adapter.metadata.projection.simpleexpression.semantic.iface.SimpleSDKExpression import com.informatica.sdk.adapter.metadata.projection.sortoperation.semantic.iface.SortOperation import com.informatica.sdk.adapter.metadata.projection.sortoperation.semantic.iface.SortOrderEnum import com.informatica.sdk.adapter.metadata.projection.sourceoperation.semantic.iface.NativeSource import com.informatica.sdk.scalatask.dataframe._ import java.text.MessageFormat import java.math.BigDecimal import java.math.BigInteger import java.text.SimpleDateFormat import java.util.ArrayList import java.util.logging.Logger import java.util.logging.Level import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DateType import com.informatica.sdk.adapter.metadata.aso.semantic.iface.ASOOperation import scala.collection.JavaConverters._ import com.informatica.sdk.adapter.metadata.extension.semantic.iface.KeyValueExtension import com.informatica.sdk.adapter.metadata.projection.semantic.iface.Projection import org.apache.spark.sql.streaming.StreamingQuery import scala.collection.mutable.ListBuffer; // * This represents the spark pushdown specific class that can be extended by adapters to provide optimized spark pushdown for // * providing/consuming dataframes // */ class MSQLDataFrameAdapter extends DataFrameAdapter { private var JDBC_DRIVER ="com.mysql.jdbc.Driver"; private var asoOp: ASOOperation = null; private var logger: Logger = null; private var tableName: String = ""; private var connectedFields: ArrayList[FieldInfo] = null; private var isTruncateTarget: Boolean = false; private val TRUNCATE_TARGET: String = "truncateTargetTable"; private var nativeRecords = new ListBuffer[FlatRecord](); /* This method reads data from the external data source and returns a dataframe containing source data Sample read scala code for mysql is provided in this method */ override def read(sparkContext:SparkContext, adpContext: AdapterContext) : DataFrame={ //Initialize the mysql jdbc driver Class.forName("com.mysql.jdbc.Driver"); //Fetch the logger from the AdapterContext var logger = adpContext.getLogger(); //Get the asooperation var asoOp = adpContext.getASOOperation(); //Fetch te connection attributes var connAttrs = adpContext.getConnectionAttributes(); var query: String = null; //Initialize all the connection attributes to variables var user:String = connAttrs.get("username").asInstanceOf[String]; var password:String = connAttrs.get("password").asInstanceOf[String]; var host:String = connAttrs.get("host").asInstanceOf[String]; var port:Int = connAttrs.get("port").asInstanceOf[Int]; var catalog:String = connAttrs.get("catalog").asInstanceOf[String]; var connectionURL:String=connAttrs.get("connectionURL").asInstanceOf[String]; logger.log(Level.INFO, "Read scala method invoked"); //Generate the mysql jdbc url using connection attributes if (catalog !=null || !catalog.isEmpty()) { connectionURL= "jdbc:mysql://" + host + ":" + port + "/" + catalog; }else { connectionURL= "jdbc:mysql://" + host + ":" + port; val catalog=""; } //Create properties object with all the connection attributes val connProps = new Properties connProps.put("user",s"${user}"); connProps.put("password",s"${password}"); connProps.put("Driver",s"${JDBC_DRIVER}"); //Fetch the projection list from the asooperation val projList:List[Projection] = asoOp.getASOProjectionsList().asScala.toList; val p: Projection = null val ob: OperationBase = null var fieldsList: List[FieldBase] = null //Creates a list of source fields from the projection for (p <- projList) { for (ob <- p.getBaseOperations.asScala.toList) { if (ob.isInstanceOf[PlatformSink]) { val inp: OperationBase = ob.asInstanceOf[PlatformSink].getInputBaseOperation fieldsList = inp.asInstanceOf[Operation].getOperationFields.asScala.toList } } } //Creates a list of native records(sources) from the projection for (p <- projList) { for (ob <- p.getBaseOperations.asScala.toList) { if (ob.isInstanceOf[NativeSource]) { if(ob.asInstanceOf[NativeSource].getNativeRecord.isInstanceOf[FlatRecord]) nativeRecords += (ob.asInstanceOf[NativeSource].getNativeRecord).asInstanceOf[FlatRecord] } } } //Use only the first native record assuming single source var record:FlatRecord = nativeRecords(0); //Fetch the source table name tableName=record.getNativeName(); //Get the connected fields for the source connectedFields = getConnectedFields(fieldsList); //Generate read query using the connected fields and adapter context query = getQuery(connectedFields, adpContext); //Use the logger for logging messages to the session log logger.log(Level.INFO, "Reader started"); logger.log(Level.INFO, "The run-time engine uses the following SQL query to read data: " + query); //Create a spark sql context to read the data from the source val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext) //Use sqlContext.read to fetch the data from the jdbc source val df_read: DataFrame = sqlContext.read.format("jdbc").option("url", s"${connectionURL}").option("driver", s"${JDBC_DRIVER}").option("query", s"${query}").option("user", s"${user}").option("password",s"${password}").load() logger.log(Level.INFO, "Reader completed"); return df_read; } /* This method writes data from the target using the data frame provided in the write call Sample write scala code for mysql is provided in this method */ override def write(sparkContext:SparkContext, adpContext: AdapterContext, dataFrame: DataFrame)= { //Initialize the mysql jdbc driver Class.forName("com.mysql.jdbc.Driver"); //Get the asooperation var asoOp = adpContext.getASOOperation(); //Fetch te connection attributes var connAttrs = adpContext.getConnectionAttributes(); //Fetch the logger from the AdapterContext var logger = adpContext.getLogger(); //Initialize all the connection attributes to variables var user:String = connAttrs.get("username").asInstanceOf[String]; var password:String = connAttrs.get("password").asInstanceOf[String]; var host:String = connAttrs.get("host").asInstanceOf[String]; var port:Int = connAttrs.get("port").asInstanceOf[Int]; var catalog:String = connAttrs.get("catalog").asInstanceOf[String]; var connectionURL:String=connAttrs.get("connectionURL").asInstanceOf[String]; //Use the logger for logging messages to the session log logger.log(Level.INFO, "Write scala method invoked"); //Generate the mysql jdbc url using connection attributes if (catalog !=null || !catalog.isEmpty()) { connectionURL= "jdbc:mysql://" + host + ":" + port + "/" + catalog; }else { connectionURL= "jdbc:mysql://" + host + ":" + port; val catalog=""; } //Create properties object with all the connection attributes val connProps = new Properties connProps.put("user",s"${user}"); connProps.put("password",s"${password}"); connProps.put("Driver",s"${JDBC_DRIVER}"); var record:FlatRecord = null val p: Projection = null val ob: OperationBase = null //Fetch the projection list from the asooperation val projList:List[Projection] = asoOp.getASOProjectionsList().asScala.toList; //Fetch the target record from the projection for (p <- projList) { for (ob <- p.getBaseOperations.asScala.toList) { if (ob.isInstanceOf[NativeSink]) { if(ob.asInstanceOf[NativeSink].getNativeRecord.isInstanceOf[FlatRecord]) record = ob.asInstanceOf[NativeSink].getNativeRecord.asInstanceOf[FlatRecord] } } } //Fetch the target table name val targettablename:String =record.getNativeName(); //Fetch the write capability attributes var writeCap = asoOp.getWriteCapabilityAttributes(); //Sample code to fetch the write capability attribute extensions and their values val writeCapExt: KeyValueExtension = writeCap.getExtensions().asInstanceOf[KeyValueExtension]; var keyMap: java.util.Map[String, Object] = writeCapExt.getAttributesMap(); //Sample code to fetch the value for TRUNCATE_TARGET write capability attribute isTruncateTarget= keyMap.get(TRUNCATE_TARGET).asInstanceOf[Boolean]; logger.log(Level.INFO, "Writer Started"); if (isTruncateTarget) { logger.log(Level.INFO, "The truncate table property is enabled."); //Use overwrite mode if truncate target is enabled dataFrame.write.mode("overwrite").jdbc(s"${connectionURL}", s"${targettablename}", connProps) } else { //Use append mode if truncate target is not enabled dataFrame.write.mode("append").jdbc(s"${connectionURL}", s"${targettablename}", connProps) } logger.log(Level.INFO, "Writer Completed"); } /* This method gets the list of connected fields */ def getConnectedFields(projectionFields: List[FieldBase]): ArrayList[FieldInfo] = { var i: Int = 0; var fields: ArrayList[FieldInfo] = new ArrayList[FieldInfo](); var pfield = null; for (pfield <- projectionFields) { var fieldName=pfield.getName().toString(); var f: FieldInfo = new FieldInfo(pfield, i); i +=1; } return fields; } /* This method generates the read query based on the connected fields */ def getQuery(connectedFields: ArrayList[FieldInfo], adpContext: AdapterContext): String = { val query = new StringBuilder(); query.append("SELECT "); var addComma: Boolean = false; var field = null; val runtimeCtx = adpContext.getRuntimeContext(); var i: Int = 0; for (field <- connectedFields.asScala) { if (addComma) { query.append(", "); } if (runtimeCtx.isFieldConnected(i)) { query.append("\"" + field.getField().getNativeSourceField().getNativeName() + "\""); } else { query.append("NULL as " + "\"" + field.getField().getNativeSourceField().getNativeName() + "\""); } addComma = true; i += 1; } query.append(" FROM " + tableName); return query.toString(); } }

0 COMMENTS

We’d like to hear from you!