Descargar en PDF - Adictos al Trabajo

Avenida de Castilla,1 - Edificio Best Point - Oficina 21B
28830 San Fernando de Henares (Madrid)
tel./fax: +34 91 675 33 06
[email protected] - www.autentia.com
¿Qué ofrece Autentia Real
Business Solutions S.L?
Somos su empresa de Soporte a Desarrollo Informático.
Ese apoyo que siempre quiso tener...
1. Desarrollo de componentes y
proyectos a medida
2. Auditoría de código y recomendaciones de mejora
3. Arranque de proyectos basados en nuevas
tecnologías
1. Definición de frameworks corporativos.
2. Transferencia de conocimiento de nuevas arquitecturas.
3. Soporte al arranque de proyectos.
4. Auditoría preventiva periódica de calidad.
5. Revisión previa a la certificación de proyectos.
6. Extensión de capacidad de equipos de calidad.
7. Identificación de problemas en producción.
3a
RFP
Gran Empresa
Concurso
Verificación
previa
Consultora 1
Tecnología
Desarrollo
Sistemas
Producción
Consultora 2
Piloto
3b
Certificación
o Pruebas
Consultora 3
autentia
Equipo propio desarrollo
4. Cursos de formación (impartidos por desarrolladores en activo)
Spring MVC, JSF-PrimeFaces /RichFaces,
HTML5, CSS3, JavaScript-jQuery
Gestor portales (Liferay)
Gestor de contenidos (Alfresco)
Aplicaciones híbridas
Control de autenticación y
acceso (Spring Security)
UDDI
Web Services
Rest Services
Social SSO
SSO (Cas)
Tareas programadas (Quartz)
Gestor documental (Alfresco)
Inversión de control (Spring)
Compartimos nuestro conociemiento en:
www.adictosaltrabajo.com
JPA-Hibernate, MyBatis
Motor de búsqueda empresarial (Solr)
ETL (Talend)
Dirección de Proyectos Informáticos.
Metodologías ágiles
Patrones de diseño
TDD
BPM (jBPM o Bonita)
Generación de informes (JasperReport)
ESB (Open ESB)
Para más información visítenos en:
www.autentia.com
Entra en Adictos a través de
E-­mail
Contraseña
Registrarme
Olvidé mi contraseña
Entrar
Inicio
Quiénes somos
Formación
Comparador de salarios
Nuestros libros
Más
» Estás en: Inicio Tutoriales Primeros pasos con Apache Kafka
Juan Alonso Ramos
Consultor tecnológico de desarrollo de proyectos informáticos.
Catálogo de servicios
Autentia
Ingeniero en Informática, especialidad en Ingeniería del Software
Puedes encontrarme en Autentia: Ofrecemos de servicios soporte a desarrollo, factoría y
formación
Somos expertos en Java/J2EE
Ver todos los tutoriales del autor
GET
AHEAD
W IT H
W SO2.
Learn About the Latest IT Trends to Make Your Business Profitable!
Fecha de publicación del tutorial: 2014-­10-­13
Tutorial visitado 18 veces Descargar en PDF
Primeros pasos con Apache Kafka
0. Índice de contenidos.
1. Introducción.
2. Entorno.
3. Instalación.
4. Producer.
5. Consumer.
6. KafkaLog4jAppender.
7. Referencias.
8. Conclusiones.
1. Introducción.
Apache Kafka es un sistema de almacenamiento publicador/subscriptor distribuido, particionado y replicado. Estas
características, añadidas a que es muy rápido en lecturas y escrituras lo convierten en una herramienta excelente para
comunicar streams de información que se generan a gran velocidad y que deben ser gestionados por uno o varias
aplicaciones. Se destacan las siguientes características:
Funciona como un servicio de mensajería, categoriza los mensajes en topics.
Los procesos que publican se denominan brokers y los subscriptores son los consumidores de los topics.
Utiliza un protocolo propio basado en TCP y Apache Zookeeper para almacenar el estado de los brokers. Cada broker
mantiene un conjunto de particiones (primaria y secundaria) de cada topic.
Se pueden programar productores/consumidores en diferentes lenguajes: Java, Scala, Python, Ruby, C++ ...
Escalable y tolerante a fallos.
Se puede utilizar para servicios de mensajería (tipo ActiveMQ o RabbitMQ), procesamiento de streams, web tracking,
trazas operacionales, etc.
Escrito en Scala.
Creado por LinkedIn.
Síguenos a través
de:
Últimas Noticias
» Curso JBoss de Red Hat
» Si eres el responsable o líder
técnico, considérate
desafortunado. No puedes
culpar a nadie por ser gris
» Portales, gestores de
contenidos documentales y
desarrollos a medida
» Comentando el libro Start-­up
Nation, La historia del milagro
económico de Israel, de Dan
Senor & Salu Singer
» Screencasts de
programación narrados en
Español
Histórico de noticias
Últimos Tutoriales
En este tutorial vamos a ver una instalación básica en una máquina, crearemos un productor y un consumidor de topics y
configuraremos un appender de logs propio de Kafka que podría servirnos para un sistema de trazas operacionales.
Puedes descargarte el código del tutorial desde aquí.
» Solución de problemas
comunes con la integración de
maven en Eclipse Luna.
» Integración de SonarQube
en Eclipse.
2. Entorno.
» Monitorización de Apache
Tomcat con psi-­probe.
El tutorial se ha realizado con el siguiente entorno:
» Trident, un compañero de
viaje para tratar con Storm
MacBook Pro 15' (2.4 GHz Intel Core i5, 8GB DDR3 SDRAM).
» Cómo se trabaja con un
Sistema Operativo: Mac OS Mavericks 10.9.5
Oracle Java SDK 1.7.0_60
Apache Kafka 0.8.1.1
Croma y para qué sirve
Últimos Tutoriales del
Autor
3. Instalación
Lo primero será descargarnos la última versión desde la página web oficial, actualmente es la 0.8.1.1.
» Trident, un compañero de
viaje para tratar con Storm
Una vez descomprimido kafka, lo movemos al directorio que más nos guste, por ejemplo a /opt/kafka:
1
2
3
4
$ tar -­xvf kafka_2.10-­0.8.1.tgz $ mv kafka_2.10-­0.8.1 /opt
$ cd /opt
$ mv kafka_2_10-­0.8.1 kafka
?
» Primeros pasos con Neo4j
» Testing de Hadoop con
MRUnit
Tendremos que dar permisos de ejecución a los scripts dentro del directorio bin
1
2
$ cd bin
$ sudo chmod +x *
» Introducción a Apache Storm
?
» Introducción a Spring Data
Hadoop
Kafka necesitar Zookeeper para trabajar. Por defecto con la distribución viene uno para pruebas que arrancar una única
instancia. Lo arrancamos a través del script de arranque zookeeper-­server-­start.sh
bin/zookeeper-­server-­start.sh config/zookeeper.properties
1
2
3
4
5
6
7
8
9
10
11
Categorías del Tutorial
jalonso@MacBook-­Pro-­Juan-­Alonso:/opt/kafka$ bin/zookeeper-­server-­start.sh config/zookeeper
?
[2014-­10-­06 18:23:58,321] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.Quoru
[2014-­10-­06 18:23:58,343] WARN Either no config or no quorum defined in config, running Big Data
[2014-­10-­06 18:23:58,530] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.Quoru
[2014-­10-­06 18:23:58,531] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain)
Java Estándar
[2014-­10-­06 18:23:58,568] INFO Server environment:zookeeper.version=3.3.3-­1203054, built on 11
[2014-­10-­06 18:23:58,568] INFO Server environment:host.name=192.168.1.35 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-­10-­06 18:23:58,568] INFO Server environment:java.version=1.7.0_65 (org.apache.zookeeper.server.ZooKeeperServer)
[2014-­10-­06 18:23:58,568] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer)
...
[2014-­10-­06 18:23:58,711] INFO Snapshotting: 0 (org.apache.zookeeper.server.persistence.FileTxnSnapLog)
En otra consola arrancamos el servidor de Kafka:
bin/kafka-­server-­start.sh config/server.properties
1
2
3
4
5
6
7
8
9
10
jalonso@MacBook-­Pro-­Juan-­Alonso:/opt/kafka$ bin/kafka-­server-­start.sh config/server.properties
?
[2014-­10-­06 18:24:46,305] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2014-­10-­06 18:24:46,459] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)
[2014-­10-­06 18:24:46,459] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)
[2014-­10-­06 18:24:46,459] INFO Property log.dirs is overridden to /tmp/kafka-­logs (kafka.utils.VerifiableProperties)
...
[2014-­10-­06 18:24:46,555] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)
[2014-­10-­06 18:24:46,558] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2014-­10-­06 18:24:46,575] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
...
Tanto la configuración de zookeeper como de kafka está en el directorio config donde se configuran los puertos de escucha,
directorio de almacenamiento por defecto, número de particiones por defecto, etc.
Zookeeper escucha en el puerto 2181 y almacena por defecto los datos en /tmp/zookeeper. Kafka escuha en el puerto 9092.
4. Producer.
Kafka dispone de un API Java para construir productores y consumidores de mensajes. El productor es muy sencillo,
únicamente se indica el servidor donde está corriendo Kafka y el topic por el que escribimos los mensajes:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.autentia.tutoriales;;
import java.util.Properties;;
import kafka.javaapi.producer.Producer;;
import kafka.producer.KeyedMessage;;
import kafka.producer.ProducerConfig;;
public class KafkaProducer {
private static final String KAFKA_SERVER = "localhost:9092";;
private final Producer<String, String> producer;;
public KafkaProducer() {
final Properties props = new Properties();;
props.put("metadata.broker.list", KAFKA_SERVER);;
props.put("serializer.class", "kafka.serializer.StringEncoder");;
producer = new Producer<String, String>(new ProducerConfig(props));;
}
public void send(String topic, String message) {
producer.send(new KeyedMessage<String, String>(topic, message));;
}
public void close() {
producer.close();;
}
public static void main(String[] args) {
new KafkaProducer().send("test", "esto es un test");;
}
}
?
5. Consumer.
Para ver de forma rápida si el producer está escribiendo en el topic indicado y le llega este mensaje a Kafka podemos arrancar
un consumer por línea de comandos:
1
2
$ bin/kafka-­console-­consumer.sh -­-­zookeeper localhost:2181 -­-­topic test
esto es un test
?
Ahora vamos a construirnos un Consumer mediante el API que nos proporciona Kafka.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.autentia.tutoriales;;
?
import java.nio.ByteBuffer;;
import java.util.HashMap;;
import java.util.Map;;
import kafka.api.FetchRequest;;
import kafka.api.FetchRequestBuilder;;
import kafka.api.PartitionOffsetRequestInfo;;
import kafka.common.TopicAndPartition;;
import kafka.javaapi.FetchResponse;;
import kafka.javaapi.OffsetRequest;;
import kafka.javaapi.consumer.SimpleConsumer;;
import kafka.message.MessageAndOffset;;
import org.apache.log4j.Logger;;
public class KafkaConsumer {
private static final Logger log = Logger.getLogger(KafkaConsumer.class);;
private static final int FETCH_SIZE = 100000;;
private static final int MAX_NUM_OFFSETS = 1;;
private static final int BUFFER_SIZE = 64 * 1024;;
private static final int TIMEOUT = 100000;;
private static final int PARTITION = 1;;
private static final int PORT = 9092;;
private static final String TOPIC = "test";;
private static final String BROKER = "localhost";;
private static final String CLIENT = "testClient";;
private final SimpleConsumer consumer;;
public KafkaConsumer() {
this.consumer = new SimpleConsumer(BROKER, PORT, TIMEOUT, BUFFER_SIZE, CLIENT);;
}
public void run() throws Exception {
long readOffset = getLastOffset(consumer, kafka.api.OffsetRequest.EarliestTime());;
//consumer never stops
while (true) {
final FetchRequest req = new FetchRequestBuilder().clientId(CLIENT).addFetch(TOPIC, PARTITION, readOffset, FETCH_SIZ
final FetchResponse fetchResponse = consumer.fetch(req);;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(TOPIC, PARTITION)) {
long currentOffset = messageAndOffset.offset();;
if (currentOffset < readOffset) {
continue;;
}
readOffset = messageAndOffset.nextOffset();;
final ByteBuffer payload = messageAndOffset.message().payload();;
final byte[] bytes = new byte[payload.limit()];;
payload.get(bytes);;
log.info("[" + messageAndOffset.offset() + "]: " + new String(bytes, "UTF-­8"
}
}
}
public static long getLastOffset(SimpleConsumer consumer, long whichTime) {
final TopicAndPartition topicAndPartition = new TopicAndPartition(TOPIC, PARTITION);;
final Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetReq
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, MAX_NUM_OFFSETS));;
final OffsetRequest offsetRequest = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), CLIENT);;
return consumer.getOffsetsBefore(offsetRequest).offsets(TOPIC, PARTITION)[0];;
}
public static void main(String args[]) {
try {
new KafkaConsumer().run();;
} catch (Exception e) {
log.error("Error:" + e);;
}
}
}
Levantamos el Producer con el mensaje enviado para el topic 'test' y lo que va saliendo por el log del consumer es el mensaje
que nos llega por estar suscritos al topic.
Creamos el consumer indicando el host y puerto donde está arrancado el broker. También se configuran parámetros para
indiar un timeout, el tamaño del buffer y un identificador para el consumer.
La gran velocidad en lecturas que tiene Kafka se debe a que los topics se leen a partir de un puntero que marca el offset
donde empiezan los mensajes. Es responsabilidad del consumer el mantenimiento de este offset.
La salida del consumer por consola devuelve lo mismo que por línea de comandos:
1
2014-­10-­09 23:55:26,989 INFO com.autentia.tutoriales.KafkaConsumer.run(KafkaConsumer.java:64) -­ [0]: esto es un ?
6. KafkaLog4jAppender.
Dada la velocidad con la que se escribe en Kafka, un posible caso de uso sería enviar a Kafka las trazas operacionales de
nuestra aplicación. De esta forma podriamos configurar consumers de logs que fueran procesándolos para detectar
problemas.
Kafka ya dispone de un Log4jAppender que nos hace todo el trabajo del productor de logs, únicamente tendríamos que
configurarlo en nuestro log4j.properties:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
log4j.rootLogger=INFO, KAFKA, stdout
log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA.layout.ConversionPattern=%-­5p: %c -­ %m%n log4j.appender.KAFKA.Topic=logs
log4j.appender.KAFKA.BrokerList=localhost:9092
log4j.appender.KAFKA.ProducerType=async
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-­5p: %c -­ %m%n log4j.logger.com.autentia.tutoriales=INFO
?
Si lanzamos la clase KafkaConsumer vemos cómo en el topic 'logs' aparecen los logs de info que envía.
1
2
3
4
5
[0]: esto es un test
?
Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(logs)
Connected to localhost:9092 for producing
Disconnecting from localhost:9092
Connected to localhost:9092 for producing
7. Referencias.
http://kafka.apache.org/07/quickstart.html
http://www.slideshare.net/miguno/apache-­kafka-­08-­basic-­training-­verisign
8. Conclusiones.
Son muchos las compañias (LinkedIn, Twitter, Netflix, Square, Spotify, Pinterest, Uber, Tumblr y muchos más) las que utilizan
Apache Kafka para analizar tráfico de sus aplicaciones, como parte de su infraestructura de procesamiento de datos,
monitorización en tiempo real, como bus de eventos, etc.
En próximos tutoriales lo configuraremos para trabajar con Twitter y Storm.
Puedes descargarte el código del tutorial desde aquí.
Espero que te haya sido de ayuda.
Un saludo.
Juan
A continuación puedes evaluarlo:
Regístrate para evaluarlo
Por favor, vota +1 o compártelo si te pareció interesante
More
Share | Share
Share
Share
Share
Share
Share
0
Anímate y coméntanos lo que pienses sobre este TUTORIAL:
» Registrate y accede a esta y otras ventajas «
Esta obra está licenciada bajo licencia Creative Commons de Reconocimiento-­No comercial-­Sin obras derivadas 2.5
PUSH THIS
---no clicks
Page Pushers
Community
Help?
0 people brought clicks to this page
+
+
+
+
+
+
+
+
powered by karmacracy
Copyright 2003-­2014 © All Rights Reserved | Texto legal y condiciones de uso | Banners | Powered by Autentia | Contacto