Data Integration Connector Toolkit
- Data Integration Connector Toolkit
- All Products
import com.informatica.sdk.adapter.metadata.common.typesystem.typelibrary.semantic.iface.StructuralFeature; import com.informatica.sdk.adapter.metadata.projection.sinkoperation.semantic.iface.PlatformSink; import com.informatica.sdk.adapter.metadata.patternblocks.procedure.semantic.iface.Procedure;
private String procName=null; private List<StructuralFeature> sfList = null; private Procedure pr = null; private ArrayList<Parameter> inputParamList = null; private ArrayList<Parameter> outputParamList = null;
List<Capability> caps1 = m_asoOperation1.getCapabilities(); Capability cap1 = caps1.get(0); if (cap1 instanceof CallCapability) { Projection readProj = m_asoOperation1.getReadProjection(); NativeSource nativeSrcOp = (NativeSource) (readProj.getBaseOperations(OperationTypeEnum.NATIVE_SOURCE) .get(0)); Procedure m_nativeProcedure = (Procedure) ((nativeSrcOp).getNativeRecord()); PlatformSink platformSink = (PlatformSink) (readProj.getBaseOperations(OperationTypeEnum.PLATFORM_SINK).get(0)); this.sfList = platformSink.getComplexType().getStructuralFeatures(); this.pr=m_nativeProcedure; this.procName = m_nativeProcedure.getName(); this.inputParamList = new ArrayList<Parameter>(); this.outputParamList = new ArrayList<Parameter>(); List<com.informatica.sdk.adapter.metadata.field.semantic.iface.FieldBase> paramList = m_nativeProcedure.getFieldList(); for (com.informatica.sdk.adapter.metadata.field.semantic.iface.FieldBase fieldBase : paramList) { Parameter param = (Parameter)fieldBase; ParameterTypeEnum type = param.getParameterTypeEnum(); switch(type) { case INOUT_TYPE: this.inputParamList.add(param); this.outputParamList.add(param); break; case IN_TYPE: this.inputParamList.add(param); break; case OUT_TYPE: this.outputParamList.add(param); break; default: } } return EReturnStatus.SUCCESS; }
@Override public int call(DataSession dataSession, CallAttributes callAttr) throws SDKException{ InfaUtils pInfaUtils = dataSession.getInfaUtilsHandle(); logger = pInfaUtils.getLogger(); MySQL_CloudTableDataConnection conn = (MySQL_CloudTableDataConnection) dataSession.getConnection(); Connection nativeConn = (Connection) conn.getNativeConnection(); int paramListSize = pr.getFieldList().size(); String procQuery = "{CALL " + procName + "("; for (int i=0 ; i < paramListSize; i++) { if(i+1==paramListSize) { procQuery=procQuery+ "?"; break; } procQuery=procQuery+ "?,"; } procQuery = procQuery + ")}"; CallableStatement callStmt = null; int col = 0; int rowSize = callAttr.getNumInputRows(); List<List<Object>> result = new ArrayList<List<Object>>(); String stringValue = null; BigDecimal bdc=null; Timestamp ts=null; Date dt=null; Time time=null; double doubleValue; float floatValue; long longValue; int intValue; try { for(int i=0; i < rowSize; i++) { col=0; callStmt = nativeConn.prepareCall(procQuery); for (com.informatica.sdk.adapter.metadata.field.semantic.iface.FieldBase fieldBase : pr .getFieldList()) { Parameter param = (Parameter) fieldBase; String dataType = param.getDataType(); DataAttributes dataAttr = new DataAttributes(); switch (dataType) { case "TINYINT" : case "SMALLINT" : case "MEDIUMINT": case "INT": if(param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); stringValue = dataSession.getStringData(dataAttr); if(stringValue==null) { callStmt.setNull(col + 1,Types.INTEGER); } else { intValue = Integer.parseInt(stringValue); callStmt.setInt(col + 1, intValue); } } else { if(dataType.equalsIgnoreCase("TINYINT")) { callStmt.registerOutParameter(col+1, Types.TINYINT); } else if(dataType.equalsIgnoreCase("SMALLINT")) { callStmt.registerOutParameter(col+1, Types.SMALLINT); } else { callStmt.registerOutParameter(col+1, Types.INTEGER); } } break; case "BIGINT": if(param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); stringValue = dataSession.getStringData(dataAttr); if(stringValue==null) { callStmt.setNull(col + 1,Types.BIGINT); } else { longValue = Long.parseLong(stringValue); callStmt.setLong(col + 1, longValue); } } else { callStmt.registerOutParameter(col+1, Types.BIGINT); } break; case "DOUBLE": if(param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); stringValue = dataSession.getStringData(dataAttr); if(stringValue==null) { callStmt.setNull(col + 1,Types.DOUBLE); } else { doubleValue = Double.parseDouble(stringValue); callStmt.setDouble(col + 1, doubleValue); } } else { callStmt.registerOutParameter(col+1, Types.BIGINT); } break; case "FLOAT": if(param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); stringValue = dataSession.getStringData(dataAttr); if(stringValue==null) { callStmt.setNull(col + 1,Types.FLOAT); } else { floatValue = Float.parseFloat(stringValue); callStmt.setFloat(col + 1, floatValue); } } else { callStmt.registerOutParameter(col+1, Types.BIGINT); } break; case "BIT": if (param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); stringValue = dataSession.getStringData(dataAttr); if(stringValue.equalsIgnoreCase("true")) stringValue="1"; else stringValue="0"; if(stringValue==null) { callStmt.setNull(col+1, Types.BIT); } else callStmt.setString(col+1, stringValue); } else { callStmt.registerOutParameter(col+1, Types.BIT); } break; case "CHAR": if (param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); stringValue = dataSession.getStringData(dataAttr); if(stringValue==null) { callStmt.setNull(col+1, Types.CHAR); } else callStmt.setString(col+1, stringValue); } else { callStmt.registerOutParameter(col+1, Types.CHAR); } break; case "VARCHAR": if (param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); stringValue = dataSession.getStringData(dataAttr); if(stringValue==null) { callStmt.setNull(col+1, Types.VARCHAR); } else callStmt.setString(col+1, stringValue); } else { callStmt.registerOutParameter(col+1, Types.VARCHAR); } break; case "DATETIME": case "TIMESTAMP": if (param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); ts = dataSession.getDateTimeData(dataAttr); if(ts==null) { callStmt.setNull(col+1, Types.TIMESTAMP); } else callStmt.setTimestamp(col+1,ts); } else { callStmt.registerOutParameter(col+1, Types.TIMESTAMP); } break; case "DATE": if (param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); ts = dataSession.getDateTimeData(dataAttr); if(ts==null) { callStmt.setNull(col+1, Types.DATE); } else { dt = new Date(ts.getTime()); callStmt.setDate(col+1,dt); } } else { callStmt.registerOutParameter(col+1, Types.DATE); } break; case "TIME": if (param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); ts = dataSession.getDateTimeData(dataAttr); if(ts==null) { callStmt.setNull(col+1, Types.TIME); } else { time = new Time(ts.getTime()); callStmt.setTime(col+1,time); } } else { callStmt.registerOutParameter(col+1, Types.TIME); } break; case "DECIMAL": if (param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); bdc=dataSession.getBigDecimalData(dataAttr); if(bdc==null) { callStmt.setNull(col+1, Types.DECIMAL); } else callStmt.setBigDecimal(col+1,bdc); } else { callStmt.registerOutParameter(col+1, Types.DECIMAL); } break; case "NUMERIC": if (param.getParameterTypeEnum()==ParameterTypeEnum.IN_TYPE || param.getParameterTypeEnum()==ParameterTypeEnum.INOUT_TYPE) { dataAttr.setRowIndex(i); dataAttr.setColumnIndex(col); bdc=dataSession.getBigDecimalData(dataAttr); if(bdc==null) { callStmt.setNull(col+1, Types.NUMERIC); } else callStmt.setBigDecimal(col+1,bdc); } else { callStmt.registerOutParameter(col+1, Types.NUMERIC); } break; default : logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE,"MySQL Stored Procedure encountered failure for the field " + param.getNativeName() + " with the value " + dataSession.getStringData(dataAttr)); logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE,"MySQL Stored Procedure is not supported for the datatype "+dataType); return EReturnStatus.FAILURE; } col++; } callStmt.executeQuery(); List<Object> resultRow = new ArrayList<Object>(); for (Parameter outParam : this.outputParamList) { Object val=null; switch(outParam.getDataType().toUpperCase()) { case "TIMESTAMP": val = callStmt.getTimestamp(outParam.getName()); resultRow.add(val); break; case "DATE": val= callStmt.getDate(outParam.getName()); resultRow.add(val); break; case "TIME": val = callStmt.getTime(outParam.getName()); resultRow.add(val); break; default: val = callStmt.getObject(outParam.getName()); resultRow.add(val); break; } } result.add(resultRow); } setProcDataToPlatform(dataSession,result); callAttr.setNumRowsInOutputBuffer(result.size()); if(callStmt!=null) callStmt.close(); } catch (SQLException e) { e.printStackTrace(); return EReturnStatus.FAILURE; } return EReturnStatus.NO_MORE_DATA; }
//** * Sets the multiple row data in the data table to the data session buffer * * <pre> * ################################## * AUTOGENERATED CODE * ################################## * </pre> * * @param dataSession * The dataSession instance, which is the container for SDK * handles. * @param dataTable * List of List of Objects. Each List of Objects represents a * single row. */ public void setProcDataToPlatform(DataSession dataSession, List<List<Object>> dataTable) throws SDKException { for (int row = 0; row < dataTable.size(); row++) { List<Object> rowData = dataTable.get(row); for (int col = 0; col < dataTable.get(0).size(); col++) { DataAttributes pDataAttributes = new DataAttributes(); pDataAttributes.setDataSetId(0); pDataAttributes.setColumnIndex(col); pDataAttributes.setRowIndex(row); Object data = rowData.get(col); String dataType = sfList.get(col).getDataType(); String columnName = sfList.get(col).getName(); if (dataType.equalsIgnoreCase("string") || dataType.equalsIgnoreCase("text")) { if (data == null) { pDataAttributes.setLength(0); dataSession.setStringData((String) data, pDataAttributes); } else { String text = data.toString(); int fieldLength = sfList.get(col).getPrecision(); if (text.length() > fieldLength) { pDataAttributes.setLength(fieldLength); pDataAttributes.setIndicator(EIndicator.TRUNCATED); dataSession.setStringData(text.substring(0, fieldLength), pDataAttributes); } else { pDataAttributes.setLength(text.length()); pDataAttributes.setIndicator(EIndicator.VALID); } dataSession.setStringData(text, pDataAttributes); } } else if (dataType.compareToIgnoreCase("double") == 0) { if (data instanceof Double) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).doubleValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setDoubleData((Double) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("float") == 0) { if (data instanceof Float) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).floatValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setFloatData((Float) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("long") == 0) { if (data instanceof Long) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).longValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setLongData((Long) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("short") == 0) { if (data instanceof Short) pDataAttributes.setIndicator(EIndicator.VALID); else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).shortValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setShortData((Short) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("integer") == 0) { if (data instanceof Integer) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).intValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setIntData((Integer) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("bigint") == 0) { if (data instanceof Long) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Number) { pDataAttributes.setIndicator(EIndicator.VALID); data = ((Number) data).longValue(); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "] " + "should be a of type [" + Number.class.getName() + "] or its sub-types."); data = null; } dataSession.setLongData((Long) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("date/time") == 0) { if (data instanceof Timestamp) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data instanceof Date) { pDataAttributes.setIndicator(EIndicator.VALID); Timestamp ts = new Timestamp(((Date) data).getTime()); data = ts; } else if (data instanceof Time) { pDataAttributes.setIndicator(EIndicator.VALID); Timestamp ts = new Timestamp(((Time) data).getTime()); data = ts; } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_ERROR, ELogLevel.TRACE_NONE, "Data for column [" + columnName + "] of type [" + dataType + "]" + " should be a of type [" + Timestamp.class.getName() + "]."); data = null; } dataSession.setDateTimeData((Timestamp) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("binary") == 0) { if (data instanceof byte[]) { pDataAttributes.setLength(((byte[]) data).length); pDataAttributes.setIndicator(EIndicator.VALID); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_DEBUG, ELogLevel.TRACE_VERBOSE_DATA, "Data for type [" + dataType + "] should be a of type [" + byte[].class.getName() + "]."); data = null; } dataSession.setBinaryData((byte[]) data, pDataAttributes); } else if (dataType.compareToIgnoreCase("decimal") == 0) { if (data instanceof BigDecimal) { pDataAttributes.setIndicator(EIndicator.VALID); } else if (data == null) { pDataAttributes.setIndicator(EIndicator.NULL); } else { logger.logMessage(EMessageLevel.MSG_DEBUG, ELogLevel.TRACE_VERBOSE_DATA, "Data for type [" + dataType + "] should be a of type [" + BigDecimal.class.getName() + "]."); data = null; } dataSession.setBigDecimalData((BigDecimal) data, pDataAttributes); } } } }