In this blog post, we'll explore how to create a real-time CSV file generation system using Kafka consumers in a C# application. Our goal is to configure up to 5 Kafka consumers, read data from a database table, and generate a CSV file whenever an update operation occurs in the table. We'll walk through each step of the process and provide code snippets to illustrate the implementation.
1. Setting Up Kafka Consumers in appsettings.json
First, let's configure Kafka consumers in the appsettings.json file. We'll define settings for up to 5 consumers, specifying topics, group IDs, and other relevant parameters:
{
"KafkaConsumers": {
"Consumer1": {
"Topic": "topic1",
"GroupId": "group1"
},
"Consumer2": {
"Topic": "topic2",
"GroupId": "group2"
},
// Add settings for Consumer3, Consumer4, and Consumer5 as needed
}
}
2. Implementing Kafka Consumer in C#
Next, we'll implement Kafka consumers in our C# application using a Kafka client library such as Confluent.Kafka. We'll create a KafkaConsumer class responsible for initializing and consuming messages from Kafka topics:
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using System;
using System.Threading;
public class KafkaConsumer
{
private readonly IConfiguration _configuration;
public KafkaConsumer(IConfiguration configuration)
{
_configuration = configuration;
}
public void ConsumeMessages(string consumerName)
{
var consumerConfig = _configuration.GetSection($"KafkaConsumers:{consumerName}").Get<ConsumerConfig>();
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(consumerConfig.Topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var message = consumer.Consume(cts.Token);
Console.WriteLine($"Received message: {message.Value} from topic {message.Topic} at partition {message.Partition}, offset {message.Offset}");
// Process the received message and generate CSV file
GenerateCSV(message.Value);
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occurred: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
}
private void GenerateCSV(string message)
{
// Logic to parse the message, fetch data from the database, and generate CSV file
// This will vary based on the structure of your message and database schema
}
}
3. Database Connectivity and CSV Generation
To generate the CSV file based on data from the database table, we'll need to establish a database connection and implement logic to retrieve data and convert it into CSV format. Here's a simplified example using Entity Framework Core:
public class DatabaseContext : DbContext
{
public DbSet<YourEntity> YourEntities { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer("your_connection_string");
}
}
public class YourEntity
{
// Define properties corresponding to your database table columns
}
4. Integrating Kafka Consumers into the Main Program
Finally, let's integrate the Kafka consumers into our main program:
class Program
{
static void Main(string[] args)
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json")
.Build();
var consumerNames = configuration.GetSection("KafkaConsumers").GetChildren().Select(c => c.Key);
foreach (var consumerName in consumerNames)
{
var kafkaConsumer = new KafkaConsumer(configuration);
new Thread(() => kafkaConsumer.ConsumeMessages(consumerName)).Start();
}
Console.ReadLine(); // Keep the program running
}
}
Below is an end-to-end C# code example for the described approach. Please note that this example assumes the usage of Entity Framework Core for database operations, Confluent.Kafka for Kafka integration and AWS SDK for .NET for S3 interaction. You'll need to install the necessary NuGet packages for these dependencies.
using System;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Amazon;
using Amazon.S3;
using Amazon.S3.Transfer;
public class DatabaseContext : DbContext
{
public DbSet<YourEntity> YourEntities { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer("your_connection_string");
}
}
public class YourEntity
{
// Define properties corresponding to your database table columns
}
public class KafkaConsumer
{
private readonly IConfiguration _configuration;
private readonly string _topic;
private readonly string _groupId;
private readonly string _s3BucketName;
public KafkaConsumer(IConfiguration configuration, string consumerName)
{
_configuration = configuration;
_topic = configuration[$"KafkaConsumers:{consumerName}:Topic"];
_groupId = configuration[$"KafkaConsumers:{consumerName}:GroupId"];
_s3BucketName = configuration["S3BucketName"];
}
public void ConsumeMessages()
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _configuration["Kafka:BootstrapServers"],
GroupId = _groupId,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(_topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var message = consumer.Consume(cts.Token);
Console.WriteLine($"Received message: {message.Value} from topic {message.Topic} at partition {message.Partition}, offset {message.Offset}");
GenerateCSV(message.Value);
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occurred: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
private void GenerateCSV(string message)
{
using (var dbContext = new DatabaseContext())
{
var data = dbContext.YourEntities.ToList();
string csvContent = ConvertDataToCSV(data);
string fileName = $"{Guid.NewGuid()}.csv";
string filePath = Path.Combine(Path.GetTempPath(), fileName);
File.WriteAllText(filePath, csvContent);
UploadToS3(fileName, filePath);
}
}
private string ConvertDataToCSV(List<YourEntity> data)
{
StringBuilder csvContent = new StringBuilder();
// Append CSV header
csvContent.AppendLine("Column1,Column2,Column3"); // Modify to match your column names
// Append data rows
foreach (var entity in data)
{
csvContent.AppendLine($"{entity.Property1},{entity.Property2},{entity.Property3}"); // Modify to match your property names
}
return csvContent.ToString();
}
private void UploadToS3(string fileName, string filePath)
{
var s3Client = new AmazonS3Client(RegionEndpoint.USWest2); // Modify region as per your setup
var transferUtility = new TransferUtility(s3Client);
try
{
transferUtility.Upload(filePath, _s3BucketName, fileName);
Console.WriteLine($"Uploaded {fileName} to S3 bucket {_s3BucketName}");
}
catch (AmazonS3Exception e)
{
Console.WriteLine($"Error uploading to S3: {e.Message}");
}
finally
{
File.Delete(filePath); // Cleanup local file
}
}
}
class Program
{
static void Main(string[] args)
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json")
.Build();
var consumerNames = configuration.GetSection("KafkaConsumers").GetChildren().Select(c => c.Key);
foreach (var consumerName in consumerNames)
{
var kafkaConsumer = new KafkaConsumer(configuration, consumerName);
new Thread(kafkaConsumer.ConsumeMessages).Start();
}
Console.ReadLine(); // Keep the program running
}
}
In this example
- Replace "your_connection_string" with your database connection string.
- Modify YourEntity class to match your database table schema.
- Ensure the Kafka and S3 bucket configurations in appsettings.json match your setup.
- Customize the CSV header and data conversion logic in the ConvertDataToCSV method.
- Adjust the S3 bucket region in the AmazonS3Client constructor as per your configuration.
This code demonstrates an end-to-end solution for monitoring database changes, producing Kafka messages, generating CSV files, and uploading them to an S3 bucket. Let me know if you have further questions or need assistance with any part of the implementation!
Conclusion
By following these steps and implementing the outlined components, you can create a robust system that monitors database changes, produces Kafka messages, generates CSV files based on the changes, and uploads them to an S3 bucket. This solution provides real-time data synchronization and ensures that CSV files are always up to date with the database changes.