INFAConnect
- INFAConnect
- All Products
package com.infa.adapter.msql.runtime.scalatask import java.util.Properties import com.informatica.sdk.adapter.metadata.projection.semantic.iface.Operation import com.informatica.sdk.adapter.metadata.projection.sinkoperation.semantic.iface.NativeSink import com.informatica.sdk.adapter.metadata.projection.sinkoperation.semantic.iface.PlatformSink import com.informatica.sdk.scalatask.dataframe.AdapterContext import com.informatica.sdk.scalatask.dataframe.DataFrameAdapter import com.informatica.sdk.adapter.metadata.capabilityattribute.semantic.iface.ASOComplexType import com.informatica.sdk.adapter.metadata.capabilityattribute.semantic.iface.ReadCapabilityAttributes import com.informatica.sdk.adapter.metadata.capabilityattribute.semantic.iface.WriteCapabilityAttributes import com.informatica.sdk.adapter.metadata.common.datasourceoperation.semantic.iface.Capability import com.informatica.sdk.adapter.metadata.common.datasourceoperation.semantic.iface.ReadCapability import com.informatica.sdk.adapter.metadata.patternblocks.field.semantic.iface.Field import com.informatica.sdk.adapter.metadata.patternblocks.flatrecord.semantic.iface.FlatRecord import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.BinaryOperatorEnum import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.Constant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.DateConstant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.DecimalConstant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.FieldIdentifier import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.IntegerConstant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.StringConstant import com.informatica.sdk.adapter.metadata.projection.expression.semantic.iface.SDKExpression import com.informatica.sdk.adapter.metadata.projection.filteroperation.semantic.iface.FilterOperation import com.informatica.sdk.adapter.metadata.projection.joinoperation.semantic.iface.JoinOperation import com.informatica.sdk.adapter.metadata.projection.projectionoperation.semantic.iface.ProjectionOperation import com.informatica.sdk.adapter.metadata.projection.projectionoperation.semantic.iface.SelectTypeEnum import com.informatica.sdk.adapter.metadata.projection.semantic.iface.FieldBase import com.informatica.sdk.adapter.metadata.projection.semantic.iface.NativeField import com.informatica.sdk.adapter.metadata.projection.semantic.iface.OperationBase import com.informatica.sdk.adapter.metadata.projection.simpleexpression.semantic.iface.SimpleBinaryExpression import com.informatica.sdk.adapter.metadata.projection.simpleexpression.semantic.iface.SimpleSDKExpression import com.informatica.sdk.adapter.metadata.projection.sortoperation.semantic.iface.SortOperation import com.informatica.sdk.adapter.metadata.projection.sortoperation.semantic.iface.SortOrderEnum import com.informatica.sdk.adapter.metadata.projection.sourceoperation.semantic.iface.NativeSource import com.informatica.sdk.scalatask.dataframe._ import java.text.MessageFormat import java.math.BigDecimal import java.math.BigInteger import java.text.SimpleDateFormat import java.util.ArrayList import java.util.logging.Logger import java.util.logging.Level import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DateType import com.informatica.sdk.adapter.metadata.aso.semantic.iface.ASOOperation import scala.collection.JavaConverters._ import com.informatica.sdk.adapter.metadata.extension.semantic.iface.KeyValueExtension import com.informatica.sdk.adapter.metadata.projection.semantic.iface.Projection import org.apache.spark.sql.streaming.StreamingQuery import scala.collection.mutable.ListBuffer; // * This represents the spark pushdown specific class that can be extended by adapters to provide optimized spark pushdown for // * providing/consuming dataframes // */ class MSQLDataFrameAdapter extends DataFrameAdapter { private var JDBC_DRIVER ="com.mysql.jdbc.Driver"; private var asoOp: ASOOperation = null; private var logger: Logger = null; private var tableName: String = ""; private var connectedFields: ArrayList[FieldInfo] = null; private var isTruncateTarget: Boolean = false; private val TRUNCATE_TARGET: String = "truncateTargetTable"; private var nativeRecords = new ListBuffer[FlatRecord](); /* This method reads data from the external data source and returns a dataframe containing source data Sample read scala code for mysql is provided in this method */ override def read(sparkContext:SparkContext, adpContext: AdapterContext) : DataFrame={ //Initialize the mysql jdbc driver Class.forName("com.mysql.jdbc.Driver"); //Fetch the logger from the AdapterContext var logger = adpContext.getLogger(); //Get the asooperation var asoOp = adpContext.getASOOperation(); //Fetch te connection attributes var connAttrs = adpContext.getConnectionAttributes(); var query: String = null; //Initialize all the connection attributes to variables var user:String = connAttrs.get("username").asInstanceOf[String]; var password:String = connAttrs.get("password").asInstanceOf[String]; var host:String = connAttrs.get("host").asInstanceOf[String]; var port:Int = connAttrs.get("port").asInstanceOf[Int]; var catalog:String = connAttrs.get("catalog").asInstanceOf[String]; var connectionURL:String=connAttrs.get("connectionURL").asInstanceOf[String]; logger.log(Level.INFO, "Read scala method invoked"); //Generate the mysql jdbc url using connection attributes if (catalog !=null || !catalog.isEmpty()) { connectionURL= "jdbc:mysql://" + host + ":" + port + "/" + catalog; }else { connectionURL= "jdbc:mysql://" + host + ":" + port; val catalog=""; } //Create properties object with all the connection attributes val connProps = new Properties connProps.put("user",s"${user}"); connProps.put("password",s"${password}"); connProps.put("Driver",s"${JDBC_DRIVER}"); //Fetch the projection list from the asooperation val projList:List[Projection] = asoOp.getASOProjectionsList().asScala.toList; val p: Projection = null val ob: OperationBase = null var fieldsList: List[FieldBase] = null //Creates a list of source fields from the projection for (p <- projList) { for (ob <- p.getBaseOperations.asScala.toList) { if (ob.isInstanceOf[PlatformSink]) { val inp: OperationBase = ob.asInstanceOf[PlatformSink].getInputBaseOperation fieldsList = inp.asInstanceOf[Operation].getOperationFields.asScala.toList } } } //Creates a list of native records(sources) from the projection for (p <- projList) { for (ob <- p.getBaseOperations.asScala.toList) { if (ob.isInstanceOf[NativeSource]) { if(ob.asInstanceOf[NativeSource].getNativeRecord.isInstanceOf[FlatRecord]) nativeRecords += (ob.asInstanceOf[NativeSource].getNativeRecord).asInstanceOf[FlatRecord] } } } //Use only the first native record assuming single source var record:FlatRecord = nativeRecords(0); //Fetch the source table name tableName=record.getNativeName(); //Get the connected fields for the source connectedFields = getConnectedFields(fieldsList); //Generate read query using the connected fields and adapter context query = getQuery(connectedFields, adpContext); //Use the logger for logging messages to the session log logger.log(Level.INFO, "Reader started"); logger.log(Level.INFO, "The run-time engine uses the following SQL query to read data: " + query); //Create a spark sql context to read the data from the source val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext) //Use sqlContext.read to fetch the data from the jdbc source val df_read: DataFrame = sqlContext.read.format("jdbc").option("url", s"${connectionURL}").option("driver", s"${JDBC_DRIVER}").option("query", s"${query}").option("user", s"${user}").option("password",s"${password}").load() logger.log(Level.INFO, "Reader completed"); return df_read; } /* This method writes data from the target using the data frame provided in the write call Sample write scala code for mysql is provided in this method */ override def write(sparkContext:SparkContext, adpContext: AdapterContext, dataFrame: DataFrame)= { //Initialize the mysql jdbc driver Class.forName("com.mysql.jdbc.Driver"); //Get the asooperation var asoOp = adpContext.getASOOperation(); //Fetch te connection attributes var connAttrs = adpContext.getConnectionAttributes(); //Fetch the logger from the AdapterContext var logger = adpContext.getLogger(); //Initialize all the connection attributes to variables var user:String = connAttrs.get("username").asInstanceOf[String]; var password:String = connAttrs.get("password").asInstanceOf[String]; var host:String = connAttrs.get("host").asInstanceOf[String]; var port:Int = connAttrs.get("port").asInstanceOf[Int]; var catalog:String = connAttrs.get("catalog").asInstanceOf[String]; var connectionURL:String=connAttrs.get("connectionURL").asInstanceOf[String]; //Use the logger for logging messages to the session log logger.log(Level.INFO, "Write scala method invoked"); //Generate the mysql jdbc url using connection attributes if (catalog !=null || !catalog.isEmpty()) { connectionURL= "jdbc:mysql://" + host + ":" + port + "/" + catalog; }else { connectionURL= "jdbc:mysql://" + host + ":" + port; val catalog=""; } //Create properties object with all the connection attributes val connProps = new Properties connProps.put("user",s"${user}"); connProps.put("password",s"${password}"); connProps.put("Driver",s"${JDBC_DRIVER}"); var record:FlatRecord = null val p: Projection = null val ob: OperationBase = null //Fetch the projection list from the asooperation val projList:List[Projection] = asoOp.getASOProjectionsList().asScala.toList; //Fetch the target record from the projection for (p <- projList) { for (ob <- p.getBaseOperations.asScala.toList) { if (ob.isInstanceOf[NativeSink]) { if(ob.asInstanceOf[NativeSink].getNativeRecord.isInstanceOf[FlatRecord]) record = ob.asInstanceOf[NativeSink].getNativeRecord.asInstanceOf[FlatRecord] } } } //Fetch the target table name val targettablename:String =record.getNativeName(); //Fetch the write capability attributes var writeCap = asoOp.getWriteCapabilityAttributes(); //Sample code to fetch the write capability attribute extensions and their values val writeCapExt: KeyValueExtension = writeCap.getExtensions().asInstanceOf[KeyValueExtension]; var keyMap: java.util.Map[String, Object] = writeCapExt.getAttributesMap(); //Sample code to fetch the value for TRUNCATE_TARGET write capability attribute isTruncateTarget= keyMap.get(TRUNCATE_TARGET).asInstanceOf[Boolean]; logger.log(Level.INFO, "Writer Started"); if (isTruncateTarget) { logger.log(Level.INFO, "The truncate table property is enabled."); //Use overwrite mode if truncate target is enabled dataFrame.write.mode("overwrite").jdbc(s"${connectionURL}", s"${targettablename}", connProps) } else { //Use append mode if truncate target is not enabled dataFrame.write.mode("append").jdbc(s"${connectionURL}", s"${targettablename}", connProps) } logger.log(Level.INFO, "Writer Completed"); } /* This method gets the list of connected fields */ def getConnectedFields(projectionFields: List[FieldBase]): ArrayList[FieldInfo] = { var i: Int = 0; var fields: ArrayList[FieldInfo] = new ArrayList[FieldInfo](); var pfield = null; for (pfield <- projectionFields) { var fieldName=pfield.getName().toString(); var f: FieldInfo = new FieldInfo(pfield, i); i +=1; } return fields; } /* This method generates the read query based on the connected fields */ def getQuery(connectedFields: ArrayList[FieldInfo], adpContext: AdapterContext): String = { val query = new StringBuilder(); query.append("SELECT "); var addComma: Boolean = false; var field = null; val runtimeCtx = adpContext.getRuntimeContext(); var i: Int = 0; for (field <- connectedFields.asScala) { if (addComma) { query.append(", "); } if (runtimeCtx.isFieldConnected(i)) { query.append("\"" + field.getField().getNativeSourceField().getNativeName() + "\""); } else { query.append("NULL as " + "\"" + field.getField().getNativeSourceField().getNativeName() + "\""); } addComma = true; i += 1; } query.append(" FROM " + tableName); return query.toString(); } }